001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Processor; 015import org.reactivestreams.Publisher; 016import org.reactivestreams.Subscriber; 017import org.reactivestreams.Subscription; 018import org.reactivestreams.tck.TestEnvironment.ManualPublisher; 019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 021import org.reactivestreams.tck.TestEnvironment.Promise; 022import org.reactivestreams.tck.flow.support.Function; 023import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules; 024import org.reactivestreams.tck.flow.support.PublisherVerificationRules; 025import org.testng.annotations.BeforeMethod; 026import org.testng.annotations.Test; 027 028import java.util.HashSet; 029import java.util.Set; 030 031public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T> 032 implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { 033 034 private final TestEnvironment env; 035 036 ////////////////////// DELEGATED TO SPECS ////////////////////// 037 038 // for delegating tests 039 private final SubscriberWhiteboxVerification<T> subscriberVerification; 040 041 // for delegating tests 042 private final PublisherVerification<T> publisherVerification; 043 044 ////////////////// END OF DELEGATED TO SPECS ////////////////// 045 046 // number of elements the processor under test must be able ot buffer, 047 // without dropping elements. Defaults to `TestEnvironment.TEST_BUFFER_SIZE`. 048 private final int processorBufferSize; 049 050 /** 051 * Test class must specify the expected time it takes for the publisher to 052 * shut itself down when the the last downstream {@code Subscription} is cancelled. 053 * 054 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 055 */ 056 @SuppressWarnings("unused") 057 public IdentityProcessorVerification(final TestEnvironment env) { 058 this(env, PublisherVerification.envPublisherReferenceGCTimeoutMillis(), TestEnvironment.TEST_BUFFER_SIZE); 059 } 060 061 /** 062 * Test class must specify the expected time it takes for the publisher to 063 * shut itself down when the the last downstream {@code Subscription} is cancelled. 064 * 065 * The processor will be required to be able to buffer {@code TestEnvironment.TEST_BUFFER_SIZE} elements. 066 * 067 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 068 */ 069 @SuppressWarnings("unused") 070 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 071 this(env, publisherReferenceGCTimeoutMillis, TestEnvironment.TEST_BUFFER_SIZE); 072 } 073 074 /** 075 * Test class must specify the expected time it takes for the publisher to 076 * shut itself down when the the last downstream {@code Subscription} is cancelled. 077 * 078 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 079 * @param processorBufferSize number of elements the processor is required to be able to buffer. 080 */ 081 public IdentityProcessorVerification(final TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) { 082 this.env = env; 083 this.processorBufferSize = processorBufferSize; 084 085 this.subscriberVerification = new SubscriberWhiteboxVerification<T>(env) { 086 @Override 087 public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) { 088 return IdentityProcessorVerification.this.createSubscriber(probe); 089 } 090 091 @Override public T createElement(int element) { 092 return IdentityProcessorVerification.this.createElement(element); 093 } 094 095 @Override 096 public Publisher<T> createHelperPublisher(long elements) { 097 return IdentityProcessorVerification.this.createHelperPublisher(elements); 098 } 099 }; 100 101 publisherVerification = new PublisherVerification<T>(env, publisherReferenceGCTimeoutMillis) { 102 @Override 103 public Publisher<T> createPublisher(long elements) { 104 return IdentityProcessorVerification.this.createPublisher(elements); 105 } 106 107 @Override 108 public Publisher<T> createFailedPublisher() { 109 return IdentityProcessorVerification.this.createFailedPublisher(); 110 } 111 112 @Override 113 public long maxElementsFromPublisher() { 114 return IdentityProcessorVerification.this.maxElementsFromPublisher(); 115 } 116 117 @Override 118 public long boundedDepthOfOnNextAndRequestRecursion() { 119 return IdentityProcessorVerification.this.boundedDepthOfOnNextAndRequestRecursion(); 120 } 121 122 @Override 123 public boolean skipStochasticTests() { 124 return IdentityProcessorVerification.this.skipStochasticTests(); 125 } 126 }; 127 } 128 129 /** 130 * This is the main method you must implement in your test incarnation. 131 * It must create a {@link Processor}, which simply forwards all stream elements from its upstream 132 * to its downstream. It must be able to internally buffer the given number of elements. 133 * 134 * @param bufferSize number of elements the processor is required to be able to buffer. 135 */ 136 public abstract Processor<T, T> createIdentityProcessor(int bufferSize); 137 138 /** 139 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 140 * 141 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 142 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 143 * 144 * If you want to ignore these additional tests, return {@code null} from this method. 145 */ 146 public abstract Publisher<T> createFailedPublisher(); 147 148 /** 149 * Override and return lower value if your Publisher is only able to produce a known number of elements. 150 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 151 * 152 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 153 * 154 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 155 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 156 */ 157 public long maxElementsFromPublisher() { 158 return Long.MAX_VALUE - 1; 159 } 160 161 /** 162 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 163 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 164 * recursive calls to exceed the number returned by this method. 165 * 166 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 167 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 168 */ 169 public long boundedDepthOfOnNextAndRequestRecursion() { 170 return 1; 171 } 172 173 /** 174 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 175 * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify— 176 * usually this means that this test case can yield false positives ("be green") even if for some case, 177 * the given implementation may violate the tested behaviour. 178 */ 179 public boolean skipStochasticTests() { 180 return false; 181 } 182 183 /** 184 * Describes the tested implementation in terms of how many subscribers they can support. 185 * Some tests require the {@code Publisher} under test to support multiple Subscribers, 186 * yet the spec does not require all publishers to be able to do so, thus – if an implementation 187 * supports only a limited number of subscribers (e.g. only 1 subscriber, also known as "no fanout") 188 * you MUST return that number from this method by overriding it. 189 */ 190 public long maxSupportedSubscribers() { 191 return Long.MAX_VALUE; 192 } 193 194 /** 195 * Override this method and return {@code true} if the {@link Processor} returned by the 196 * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s 197 * request amounts and only delivers onNext signals if all Subscribers have 198 * indicated (via their Subscription#request(long)) they are ready to receive elements. 199 */ 200 public boolean doesCoordinatedEmission() { 201 return false; 202 } 203 204 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 205 206 @BeforeMethod 207 public void setUp() throws Exception { 208 publisherVerification.setUp(); 209 subscriberVerification.setUp(); 210 } 211 212 ////////////////////// PUBLISHER RULES VERIFICATION /////////////////////////// 213 214 // A Processor 215 // must obey all Publisher rules on its publishing side 216 public Publisher<T> createPublisher(long elements) { 217 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 218 final Publisher<T> pub = createHelperPublisher(elements); 219 pub.subscribe(processor); 220 return processor; // we run the PublisherVerification against this 221 } 222 223 @Override @Test 224 public void required_validate_maxElementsFromPublisher() throws Exception { 225 publisherVerification.required_validate_maxElementsFromPublisher(); 226 } 227 228 @Override @Test 229 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 230 publisherVerification.required_validate_boundedDepthOfOnNextAndRequestRecursion(); 231 } 232 233 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" PUBLISHER ////////////////////// 234 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 235 236 @Test 237 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 238 publisherVerification.required_createPublisher1MustProduceAStreamOfExactly1Element(); 239 } 240 241 @Test 242 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 243 publisherVerification.required_createPublisher3MustProduceAStreamOfExactly3Elements(); 244 } 245 246 @Override @Test 247 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 248 publisherVerification.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements(); 249 } 250 251 @Override @Test 252 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 253 publisherVerification.required_spec102_maySignalLessThanRequestedAndTerminateSubscription(); 254 } 255 256 @Override @Test 257 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 258 publisherVerification.stochastic_spec103_mustSignalOnMethodsSequentially(); 259 } 260 261 @Override @Test 262 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 263 publisherVerification.optional_spec104_mustSignalOnErrorWhenFails(); 264 } 265 266 @Override @Test 267 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 268 publisherVerification.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates(); 269 } 270 271 @Override @Test 272 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 273 publisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete(); 274 } 275 276 @Override @Test 277 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 278 publisherVerification.untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled(); 279 } 280 281 @Override @Test 282 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 283 publisherVerification.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled(); 284 } 285 286 @Override @Test 287 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 288 publisherVerification.untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled(); 289 } 290 291 @Override @Test 292 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 293 publisherVerification.untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals(); 294 } 295 296 @Override @Test 297 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 298 publisherVerification.untested_spec109_subscribeShouldNotThrowNonFatalThrowable(); 299 } 300 301 @Override @Test 302 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 303 publisherVerification.required_spec109_subscribeThrowNPEOnNullSubscriber(); 304 } 305 306 @Override @Test 307 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 308 publisherVerification.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe(); 309 } 310 311 @Override @Test 312 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 313 publisherVerification.required_spec109_mustIssueOnSubscribeForNonNullSubscriber(); 314 } 315 316 @Override @Test 317 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 318 publisherVerification.untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice(); 319 } 320 321 @Override @Test 322 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 323 publisherVerification.optional_spec111_maySupportMultiSubscribe(); 324 } 325 326 @Override @Test 327 public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable { 328 publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals(); 329 } 330 331 @Override @Test 332 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 333 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); 334 } 335 336 @Override @Test 337 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 338 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront(); 339 } 340 341 @Override @Test 342 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 343 publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected(); 344 } 345 346 @Override @Test 347 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 348 publisherVerification.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe(); 349 } 350 351 @Override @Test 352 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 353 publisherVerification.required_spec303_mustNotAllowUnboundedRecursion(); 354 } 355 356 @Override @Test 357 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 358 publisherVerification.untested_spec304_requestShouldNotPerformHeavyComputations(); 359 } 360 361 @Override @Test 362 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception { 363 publisherVerification.untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation(); 364 } 365 366 @Override @Test 367 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 368 publisherVerification.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops(); 369 } 370 371 @Override @Test 372 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 373 publisherVerification.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops(); 374 } 375 376 @Override @Test 377 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 378 publisherVerification.required_spec309_requestZeroMustSignalIllegalArgumentException(); 379 } 380 381 @Override @Test 382 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 383 publisherVerification.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException(); 384 } 385 386 @Override @Test 387 public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable { 388 publisherVerification.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage(); 389 } 390 391 @Override @Test 392 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 393 publisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(); 394 } 395 396 @Override @Test 397 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 398 publisherVerification.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(); 399 } 400 401 @Override @Test 402 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 403 publisherVerification.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue(); 404 } 405 406 @Override @Test 407 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 408 publisherVerification.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue(); 409 } 410 411 @Override @Test 412 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 413 publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(); 414 } 415 416 417 /** 418 * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks if two {@code Subscriber}s 419 * receive the same items and a terminal {@code Exception}. 420 * <p> 421 * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, 422 * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. 423 * <p> 424 * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>1.4</a> with multiple 425 * {@code Subscriber}s. 426 * <p> 427 * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. 428 * <p> 429 * If this test fails, the following could be checked within the {@code Processor} implementation: 430 * <ul> 431 * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> 432 * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or 433 * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s 434 * both have to request first.</li> 435 * </ul> 436 */ 437 @Test 438 public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Throwable { 439 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 440 @Override 441 public TestSetup apply(Long aLong) throws Throwable { 442 return new TestSetup(env, processorBufferSize) {{ 443 final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env); 444 env.subscribe(processor, sub1); 445 446 final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env); 447 env.subscribe(processor, sub2); 448 449 final Exception ex = new RuntimeException("Test exception"); 450 451 if (doesCoordinatedEmission()) { 452 sub1.request(1); 453 sub2.request(1); 454 455 expectRequest(); 456 457 final T x = sendNextTFromUpstream(); 458 459 expectNextElement(sub1, x); 460 expectNextElement(sub2, x); 461 462 sub1.request(1); 463 sub2.request(1); 464 } else { 465 sub1.request(1); 466 467 expectRequest(env.defaultTimeoutMillis(), 468 "If the Processor coordinates requests/emissions when having multiple Subscribers" 469 + " at once, please override doesCoordinatedEmission() to return true in this " 470 + "IdentityProcessorVerification to allow this test to pass."); 471 472 final T x = sendNextTFromUpstream(); 473 expectNextElement(sub1, x, 474 "If the Processor coordinates requests/emissions when having multiple Subscribers" 475 + " at once, please override doesCoordinatedEmission() to return true in this " 476 + "IdentityProcessorVerification to allow this test to pass."); 477 478 sub1.request(1); 479 480 // sub1 has received one element, and has one demand pending 481 // sub2 has not yet requested anything 482 } 483 sendError(ex); 484 485 sub1.expectError(ex); 486 sub2.expectError(ex); 487 488 env.verifyNoAsyncErrorsNoDelay(); 489 }}; 490 } 491 }); 492 } 493 494 ////////////////////// SUBSCRIBER RULES VERIFICATION /////////////////////////// 495 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 496 497 // A Processor 498 // must obey all Subscriber rules on its consuming side 499 public Subscriber<T> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<T> probe) { 500 final Processor<T, T> processor = createIdentityProcessor(processorBufferSize); 501 processor.subscribe( 502 new Subscriber<T>() { 503 private final Promise<Subscription> subs = new Promise<Subscription>(env); 504 505 @Override 506 public void onSubscribe(final Subscription subscription) { 507 if (env.debugEnabled()) { 508 env.debug(String.format("whiteboxSubscriber::onSubscribe(%s)", subscription)); 509 } 510 if (subs.isCompleted()) subscription.cancel(); // the Probe must also pass subscriber verification 511 512 probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet() { 513 514 @Override 515 public void triggerRequest(long elements) { 516 subscription.request(elements); 517 } 518 519 @Override 520 public void signalCancel() { 521 subscription.cancel(); 522 } 523 }); 524 } 525 526 @Override 527 public void onNext(T element) { 528 if (env.debugEnabled()) { 529 env.debug(String.format("whiteboxSubscriber::onNext(%s)", element)); 530 } 531 probe.registerOnNext(element); 532 } 533 534 @Override 535 public void onComplete() { 536 if (env.debugEnabled()) { 537 env.debug("whiteboxSubscriber::onComplete()"); 538 } 539 probe.registerOnComplete(); 540 } 541 542 @Override 543 public void onError(Throwable cause) { 544 if (env.debugEnabled()) { 545 env.debug(String.format("whiteboxSubscriber::onError(%s)", cause)); 546 } 547 probe.registerOnError(cause); 548 } 549 }); 550 551 return processor; // we run the SubscriberVerification against this 552 } 553 554 ////////////////////// OTHER RULE VERIFICATION /////////////////////////// 555 556 // A Processor 557 // must immediately pass on `onError` events received from its upstream to its downstream 558 @Test 559 public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception { 560 new TestSetup(env, processorBufferSize) {{ 561 final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env); 562 env.subscribe(processor, sub); 563 564 final Exception ex = new RuntimeException("Test exception"); 565 sendError(ex); 566 sub.expectError(ex); // "immediately", i.e. without a preceding request 567 568 env.verifyNoAsyncErrorsNoDelay(); 569 }}; 570 } 571 572 /////////////////////// DELEGATED TESTS, A PROCESSOR "IS A" SUBSCRIBER ////////////////////// 573 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#4.1 574 575 @Test 576 public void required_exerciseWhiteboxHappyPath() throws Throwable { 577 subscriberVerification.required_exerciseWhiteboxHappyPath(); 578 } 579 580 @Override @Test 581 public void required_spec201_mustSignalDemandViaSubscriptionRequest() throws Throwable { 582 subscriberVerification.required_spec201_mustSignalDemandViaSubscriptionRequest(); 583 } 584 585 @Override @Test 586 public void untested_spec202_shouldAsynchronouslyDispatch() throws Exception { 587 subscriberVerification.untested_spec202_shouldAsynchronouslyDispatch(); 588 } 589 590 @Override @Test 591 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 592 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete(); 593 } 594 595 @Override @Test 596 public void required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 597 subscriberVerification.required_spec203_mustNotCallMethodsOnSubscriptionOrPublisherInOnError(); 598 } 599 600 @Override @Test 601 public void untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 602 subscriberVerification.untested_spec204_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError(); 603 } 604 605 @Override @Test 606 public void required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Throwable { 607 subscriberVerification.required_spec205_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal(); 608 } 609 610 @Override @Test 611 public void untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 612 subscriberVerification.untested_spec206_mustCallSubscriptionCancelIfItIsNoLongerValid(); 613 } 614 615 @Override @Test 616 public void untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 617 subscriberVerification.untested_spec207_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization(); 618 } 619 620 @Override @Test 621 public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 622 subscriberVerification.required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel(); 623 } 624 625 @Override @Test 626 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 627 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall(); 628 } 629 630 @Override @Test 631 public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 632 subscriberVerification.required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall(); 633 } 634 635 @Override @Test 636 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 637 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall(); 638 } 639 640 @Override @Test 641 public void required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { 642 subscriberVerification.required_spec210_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall(); 643 } 644 645 @Override @Test 646 public void untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 647 subscriberVerification.untested_spec211_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents(); 648 } 649 650 @Override @Test 651 public void untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation() throws Throwable { 652 subscriberVerification.untested_spec212_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality_specViolation(); 653 } 654 655 @Override @Test 656 public void untested_spec213_failingOnSignalInvocation() throws Exception { 657 subscriberVerification.untested_spec213_failingOnSignalInvocation(); 658 } 659 660 @Override @Test 661 public void required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 662 subscriberVerification.required_spec213_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull(); 663 } 664 @Override @Test 665 public void required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 666 subscriberVerification.required_spec213_onNext_mustThrowNullPointerExceptionWhenParametersAreNull(); 667 } 668 @Override @Test 669 public void required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 670 subscriberVerification.required_spec213_onError_mustThrowNullPointerExceptionWhenParametersAreNull(); 671 } 672 673 @Override @Test 674 public void untested_spec301_mustNotBeCalledOutsideSubscriberContext() throws Exception { 675 subscriberVerification.untested_spec301_mustNotBeCalledOutsideSubscriberContext(); 676 } 677 678 @Override @Test 679 public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 680 subscriberVerification.required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); 681 } 682 683 @Override @Test 684 public void untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 685 subscriberVerification.untested_spec310_requestMaySynchronouslyCallOnNextOnSubscriber(); 686 } 687 688 @Override @Test 689 public void untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 690 subscriberVerification.untested_spec311_requestMaySynchronouslyCallOnCompleteOrOnError(); 691 } 692 693 @Override @Test 694 public void untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 695 subscriberVerification.untested_spec314_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists(); 696 } 697 698 @Override @Test 699 public void untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 700 subscriberVerification.untested_spec315_cancelMustNotThrowExceptionAndMustSignalOnError(); 701 } 702 703 @Override @Test 704 public void untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 705 subscriberVerification.untested_spec316_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber(); 706 } 707 708 /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////// 709 710 /** 711 * Asks for a {@code Processor} that supports at least 2 {@code Subscriber}s at once and checks requests 712 * from {@code Subscriber}s will eventually lead to requests towards the upstream of the {@code Processor}. 713 * <p> 714 * If the {@code Processor} requests and/or emits items only when all of its {@code Subscriber}s have requested, 715 * override {@link #doesCoordinatedEmission()} and return {@code true} to indicate this property. 716 * <p> 717 * <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.4'>2.1</a> with multiple 718 * {@code Subscriber}s. 719 * <p> 720 * The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2. 721 * <p> 722 * If this test fails, the following could be checked within the {@code Processor} implementation: 723 * <ul> 724 * <li>The {@code TestEnvironment} has large enough timeout specified in case the {@code Processor} has some time-delay behavior.</li> 725 * <li>The {@code Processor} is able to fulfill requests of its {@code Subscriber}s independently of each other's requests or 726 * else override {@link #doesCoordinatedEmission()} and return {@code true} to indicate the test {@code Subscriber}s 727 * both have to request first.</li> 728 * </ul> 729 */ 730 @Test 731 public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo() throws Throwable { 732 optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() { 733 @Override 734 public TestSetup apply(Long subscribers) throws Throwable { 735 return new TestSetup(env, processorBufferSize) {{ 736 ManualSubscriber<T> sub1 = newSubscriber(); 737 sub1.request(20); 738 739 long totalRequests = expectRequest(); 740 final T x = sendNextTFromUpstream(); 741 expectNextElement(sub1, x); 742 743 if (totalRequests == 1) { 744 totalRequests += expectRequest(); 745 } 746 final T y = sendNextTFromUpstream(); 747 expectNextElement(sub1, y); 748 749 if (totalRequests == 2) { 750 totalRequests += expectRequest(); 751 } 752 753 final ManualSubscriber<T> sub2 = newSubscriber(); 754 755 // sub1 now has 18 pending 756 // sub2 has 0 pending 757 758 if (doesCoordinatedEmission()) { 759 sub2.expectNone(); // since sub2 hasn't requested anything yet 760 761 sub2.request(1); 762 763 final T z = sendNextTFromUpstream(); 764 expectNextElement(sub1, z); 765 expectNextElement(sub2, z); 766 } else { 767 final T z = sendNextTFromUpstream(); 768 expectNextElement(sub1, z, 769 "If the Processor coordinates requests/emissions when having multiple Subscribers" 770 + " at once, please override doesCoordinatedEmission() to return true in this " 771 + "IdentityProcessorVerification to allow this test to pass."); 772 sub2.expectNone(); // since sub2 hasn't requested anything yet 773 774 sub2.request(1); 775 expectNextElement(sub2, z); 776 } 777 if (totalRequests == 3) { 778 expectRequest(); 779 } 780 781 // to avoid error messages during test harness shutdown 782 sendCompletion(); 783 sub1.expectCompletion(env.defaultTimeoutMillis()); 784 sub2.expectCompletion(env.defaultTimeoutMillis()); 785 786 env.verifyNoAsyncErrorsNoDelay(); 787 }}; 788 } 789 }); 790 } 791 792 /////////////////////// TEST INFRASTRUCTURE ////////////////////// 793 794 public void notVerified() { 795 publisherVerification.notVerified(); 796 } 797 798 public void notVerified(String message) { 799 publisherVerification.notVerified(message); 800 } 801 802 /** 803 * Test for feature that REQUIRES multiple subscribers to be supported by Publisher. 804 */ 805 public void optionalMultipleSubscribersTest(long requiredSubscribersSupport, Function<Long, TestSetup> body) throws Throwable { 806 if (requiredSubscribersSupport > maxSupportedSubscribers()) 807 notVerified(String.format("The Publisher under test only supports %d subscribers, while this test requires at least %d to run.", 808 maxSupportedSubscribers(), requiredSubscribersSupport)); 809 else body.apply(requiredSubscribersSupport); 810 } 811 812 public abstract class TestSetup extends ManualPublisher<T> { 813 final private ManualSubscriber<T> tees; // gives us access to an infinite stream of T values 814 private Set<T> seenTees = new HashSet<T>(); 815 816 final Processor<T, T> processor; 817 818 public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { 819 super(env); 820 tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE)); 821 processor = createIdentityProcessor(testBufferSize); 822 subscribe(processor); 823 } 824 825 public ManualSubscriber<T> newSubscriber() throws InterruptedException { 826 return env.newManualSubscriber(processor); 827 } 828 829 public T nextT() throws InterruptedException { 830 final T t = tees.requestNextElement(); 831 if (seenTees.contains(t)) { 832 env.flop(String.format("Helper publisher illegally produced the same element %s twice", t)); 833 } 834 seenTees.add(t); 835 return t; 836 } 837 838 public void expectNextElement(ManualSubscriber<T> sub, T expected) throws InterruptedException { 839 final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected)); 840 if (!elem.equals(expected)) { 841 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 842 } 843 } 844 845 public void expectNextElement(ManualSubscriber<T> sub, T expected, String errorMessageAddendum) throws InterruptedException { 846 final T elem = sub.nextElement(String.format("timeout while awaiting %s. %s", expected, errorMessageAddendum)); 847 if (!elem.equals(expected)) { 848 env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected)); 849 } 850 } 851 852 public T sendNextTFromUpstream() throws InterruptedException { 853 final T x = nextT(); 854 sendNext(x); 855 return x; 856 } 857 } 858 859 public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> { 860 Promise<Throwable> error; 861 862 public ManualSubscriberWithErrorCollection(TestEnvironment env) { 863 super(env); 864 error = new Promise<Throwable>(env); 865 } 866 867 @Override 868 public void onError(Throwable cause) { 869 error.complete(cause); 870 } 871 872 public void expectError(Throwable cause) throws InterruptedException { 873 expectError(cause, env.defaultTimeoutMillis()); 874 } 875 876 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 877 public void expectError(Throwable cause, long timeoutMillis) throws InterruptedException { 878 error.expectCompletion(timeoutMillis, "Did not receive expected error on downstream"); 879 if (!cause.equals(error.value())) { 880 env.flop(String.format("Expected error %s but got %s", cause, error.value())); 881 } 882 } 883 } 884}