001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport; 018import org.reactivestreams.tck.TestEnvironment.Latch; 019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 021import org.reactivestreams.tck.support.Function; 022import org.reactivestreams.tck.support.Optional; 023import org.reactivestreams.tck.support.PublisherVerificationRules; 024import org.testng.SkipException; 025import org.testng.annotations.BeforeMethod; 026import org.testng.annotations.Test; 027 028import java.lang.Override; 029import java.lang.ref.ReferenceQueue; 030import java.lang.ref.WeakReference; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Random; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicReference; 038 039import static org.testng.Assert.assertEquals; 040import static org.testng.Assert.assertTrue; 041 042/** 043 * Provides tests for verifying {@code Publisher} specification rules. 044 * 045 * @see org.reactivestreams.Publisher 046 */ 047public abstract class PublisherVerification<T> implements PublisherVerificationRules { 048 049 private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS"; 050 private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L; 051 052 private final TestEnvironment env; 053 054 /** 055 * The amount of time after which a cancelled Subscriber reference should be dropped. 056 * See Rule 3.13 for details. 057 */ 058 private final long publisherReferenceGCTimeoutMillis; 059 060 /** 061 * Constructs a new verification class using the given env and configuration. 062 * 063 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 064 */ 065 public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 066 this.env = env; 067 this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis; 068 } 069 070 /** 071 * Constructs a new verification class using the given env and configuration. 072 * 073 * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}. 074 */ 075 public PublisherVerification(TestEnvironment env) { 076 this.env = env; 077 this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis(); 078 } 079 080 /** 081 * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present, 082 * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}). 083 * 084 * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 085 * 086 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 087 */ 088 public static long envPublisherReferenceGCTimeoutMillis() { 089 final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV); 090 if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS; 091 else try { 092 return Long.parseLong(envMillis); 093 } catch (NumberFormatException ex) { 094 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex); 095 } 096 } 097 098 /** 099 * This is the main method you must implement in your test incarnation. 100 * It must create a Publisher for a stream with exactly the given number of elements. 101 * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite. 102 */ 103 public abstract Publisher<T> createPublisher(long elements); 104 105 /** 106 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 107 * 108 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 109 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 110 * 111 * If you ignore these additional tests, return {@code null} from this method. 112 */ 113 public abstract Publisher<T> createFailedPublisher(); 114 115 116 /** 117 * Override and return lower value if your Publisher is only able to produce a known number of elements. 118 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 119 * 120 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 121 * 122 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 123 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 124 */ 125 public long maxElementsFromPublisher() { 126 return Long.MAX_VALUE - 1; 127 } 128 129 /** 130 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 131 * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify— 132 * usually this means that this test case can yield false positives ("be green") even if for some case, 133 * the given implementation may violate the tested behaviour. 134 */ 135 public boolean skipStochasticTests() { 136 return false; 137 } 138 139 /** 140 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 141 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 142 * recursive calls to exceed the number returned by this method. 143 * 144 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 145 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 146 */ 147 public long boundedDepthOfOnNextAndRequestRecursion() { 148 return 1; 149 } 150 151 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 152 153 @BeforeMethod 154 public void setUp() throws Exception { 155 env.clearAsyncErrors(); 156 } 157 158 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 159 160 @Override @Test 161 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 162 activePublisherTest(1, true, new PublisherTestRun<T>() { 163 @Override 164 public void run(Publisher<T> pub) throws InterruptedException { 165 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 166 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 167 sub.requestEndOfStream(); 168 } 169 170 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 171 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 172 } 173 174 }); 175 } 176 177 @Override @Test 178 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 179 activePublisherTest(3, true, new PublisherTestRun<T>() { 180 @Override 181 public void run(Publisher<T> pub) throws InterruptedException { 182 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 183 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 184 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub)); 185 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub)); 186 sub.requestEndOfStream(); 187 } 188 189 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 190 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 191 } 192 193 }); 194 } 195 196 @Override @Test 197 public void required_validate_maxElementsFromPublisher() throws Exception { 198 assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0"); 199 } 200 201 @Override @Test 202 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 203 assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1"); 204 } 205 206 207 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 208 209 @Override @Test 210 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 211 activePublisherTest(5, false, new PublisherTestRun<T>() { 212 @Override 213 public void run(Publisher<T> pub) throws InterruptedException { 214 215 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 216 try { 217 sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); 218 sub.request(1); 219 sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); 220 sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); 221 222 sub.request(1); 223 sub.request(2); 224 sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); 225 226 sub.expectNone(String.format("Publisher %sproduced unrequested ", pub)); 227 } finally { 228 sub.cancel(); 229 } 230 } 231 }); 232 } 233 234 @Override @Test 235 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 236 final int elements = 3; 237 final int requested = 10; 238 239 activePublisherTest(elements, true, new PublisherTestRun<T>() { 240 @Override 241 public void run(Publisher<T> pub) throws Throwable { 242 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 243 sub.request(requested); 244 sub.nextElements(elements); 245 sub.expectCompletion(); 246 } 247 }); 248 } 249 250 @Override @Test 251 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 252 final int iterations = 100; 253 final int elements = 10; 254 255 stochasticTest(iterations, new Function<Integer, Void>() { 256 @Override 257 public Void apply(final Integer runNumber) throws Throwable { 258 activePublisherTest(elements, true, new PublisherTestRun<T>() { 259 @Override 260 public void run(Publisher<T> pub) throws Throwable { 261 final Latch completionLatch = new Latch(env); 262 263 final AtomicInteger gotElements = new AtomicInteger(0); 264 pub.subscribe(new Subscriber<T>() { 265 private Subscription subs; 266 267 private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier(); 268 269 /** 270 * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect 271 * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness. 272 * 273 * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call 274 * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough 275 * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread 276 * to enter subsequent critical sections ("nesting" them effectively). 277 */ 278 final class ConcurrentAccessBarrier { 279 private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null); 280 private volatile String previousSignal = null; 281 282 public void enterSignal(String signalName) { 283 if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) { 284 env.flop(String.format( 285 "Illegal concurrent access detected (entering critical section)! " + 286 "%s emited %s signal, before %s finished its %s signal.", 287 Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal)); 288 } 289 this.previousSignal = signalName; 290 } 291 292 public void leaveSignal(String signalName) { 293 currentlySignallingThread.set(null); 294 this.previousSignal = signalName; 295 } 296 297 private boolean isSynchronousSignal() { 298 return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get()); 299 } 300 301 } 302 303 @Override 304 public void onSubscribe(Subscription s) { 305 final String signal = "onSubscribe()"; 306 concurrentAccessBarrier.enterSignal(signal); 307 308 subs = s; 309 subs.request(1); 310 311 concurrentAccessBarrier.leaveSignal(signal); 312 } 313 314 @Override 315 public void onNext(T ignore) { 316 final String signal = String.format("onNext(%s)", ignore); 317 concurrentAccessBarrier.enterSignal(signal); 318 319 if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this) 320 subs.request(1); 321 322 concurrentAccessBarrier.leaveSignal(signal); 323 } 324 325 @Override 326 public void onError(Throwable t) { 327 final String signal = String.format("onError(%s)", t.getMessage()); 328 concurrentAccessBarrier.enterSignal(signal); 329 330 // ignore value 331 332 concurrentAccessBarrier.leaveSignal(signal); 333 } 334 335 @Override 336 public void onComplete() { 337 final String signal = "onComplete()"; 338 concurrentAccessBarrier.enterSignal(signal); 339 340 // entering for completeness 341 342 concurrentAccessBarrier.leaveSignal(signal); 343 completionLatch.close(); 344 } 345 }); 346 347 completionLatch.expectClose( 348 elements * env.defaultTimeoutMillis(), 349 String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it", 350 runNumber, iterations, elements, gotElements.get())); 351 } 352 }); 353 return null; 354 } 355 }); 356 } 357 358 @Override @Test 359 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 360 try { 361 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 362 @Override 363 public void run(final Publisher<T> pub) throws InterruptedException { 364 final Latch onErrorlatch = new Latch(env); 365 final Latch onSubscribeLatch = new Latch(env); 366 pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) { 367 @Override 368 public void onSubscribe(Subscription subs) { 369 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 370 onSubscribeLatch.close(); 371 } 372 @Override 373 public void onError(Throwable cause) { 374 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 375 onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); 376 onErrorlatch.close(); 377 } 378 }); 379 380 onSubscribeLatch.expectClose("Should have received onSubscribe"); 381 onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); 382 383 env.verifyNoAsyncErrors(); 384 } 385 }); 386 } catch (SkipException se) { 387 throw se; 388 } catch (Throwable ex) { 389 // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe 390 // which was wrong of him - he should have signalled on error using onError 391 throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex); 392 } 393 } 394 395 @Override @Test 396 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 397 activePublisherTest(3, true, new PublisherTestRun<T>() { 398 @Override 399 public void run(Publisher<T> pub) throws Throwable { 400 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 401 sub.requestNextElement(); 402 sub.requestNextElement(); 403 sub.requestNextElement(); 404 sub.requestEndOfStream(); 405 sub.expectNone(); 406 } 407 }); 408 } 409 410 @Override @Test 411 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 412 optionalActivePublisherTest(0, true, new PublisherTestRun<T>() { 413 @Override 414 public void run(Publisher<T> pub) throws Throwable { 415 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 416 sub.request(1); 417 sub.expectCompletion(); 418 sub.expectNone(); 419 } 420 }); 421 } 422 423 @Override @Test 424 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 425 notVerified(); // not really testable without more control over the Publisher 426 } 427 428 @Override @Test 429 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 430 activePublisherTest(1, true, new PublisherTestRun<T>() { 431 @Override 432 public void run(Publisher<T> pub) throws Throwable { 433 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 434 sub.request(10); 435 sub.nextElement(); 436 sub.expectCompletion(); 437 438 sub.request(10); 439 sub.expectNone(); 440 } 441 }); 442 } 443 444 @Override @Test 445 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 446 notVerified(); // can we meaningfully test this, without more control over the publisher? 447 } 448 449 @Override @Test 450 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 451 notVerified(); // can we meaningfully test this? 452 } 453 454 @Override @Test 455 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 456 notVerified(); // can we meaningfully test this? 457 } 458 459 @Override @Test 460 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 461 activePublisherTest(0, false, new PublisherTestRun<T>() { 462 @Override 463 public void run(Publisher<T> pub) throws Throwable { 464 try { 465 pub.subscribe(null); 466 env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe"); 467 } catch (NullPointerException ignored) { 468 // valid behaviour 469 } 470 env.verifyNoAsyncErrorsNoDelay(); 471 } 472 }); 473 } 474 475 @Override @Test 476 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 477 activePublisherTest(0, false, new PublisherTestRun<T>() { 478 @Override 479 public void run(Publisher<T> pub) throws Throwable { 480 final Latch onSubscribeLatch = new Latch(env); 481 final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>(); 482 try { 483 pub.subscribe(new Subscriber<T>() { 484 @Override 485 public void onError(Throwable cause) { 486 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 487 } 488 489 @Override 490 public void onSubscribe(Subscription subs) { 491 cancel.set(subs); 492 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 493 onSubscribeLatch.close(); 494 } 495 496 @Override 497 public void onNext(T elem) { 498 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); 499 } 500 501 @Override 502 public void onComplete() { 503 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); 504 } 505 }); 506 onSubscribeLatch.expectClose("Should have received onSubscribe"); 507 env.verifyNoAsyncErrorsNoDelay(); 508 } finally { 509 Subscription s = cancel.getAndSet(null); 510 if (s != null) { 511 s.cancel(); 512 } 513 } 514 } 515 }); 516 } 517 518 @Override @Test 519 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 520 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 521 @Override 522 public void run(Publisher<T> pub) throws Throwable { 523 final Latch onErrorLatch = new Latch(env); 524 final Latch onSubscribeLatch = new Latch(env); 525 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 526 @Override 527 public void onError(Throwable cause) { 528 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 529 onErrorLatch.assertOpen("Only one onError call expected"); 530 onErrorLatch.close(); 531 } 532 533 @Override 534 public void onSubscribe(Subscription subs) { 535 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 536 onSubscribeLatch.close(); 537 } 538 }; 539 pub.subscribe(sub); 540 onSubscribeLatch.expectClose("Should have received onSubscribe"); 541 onErrorLatch.expectClose("Should have received onError"); 542 543 env.verifyNoAsyncErrorsNoDelay(); 544 } 545 }); 546 } 547 548 @Override @Test 549 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 550 notVerified(); // can we meaningfully test this? 551 } 552 553 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 554 @Override @Test 555 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 556 optionalActivePublisherTest(1, false, new PublisherTestRun<T>() { 557 @Override 558 public void run(Publisher<T> pub) throws Throwable { 559 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 560 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 561 562 try { 563 env.verifyNoAsyncErrors(); 564 } finally { 565 try { 566 sub1.cancel(); 567 } finally { 568 sub2.cancel(); 569 } 570 } 571 } 572 }); 573 } 574 575 @Override @Test 576 public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable { 577 optionalActivePublisherTest(1, false, new PublisherTestRun<T>() { 578 @Override 579 public void run(Publisher<T> pub) throws Throwable { 580 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 581 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 582 // Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario, 583 // and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an 584 // onNext (and optional onComplete) signal(s) and which just onComplete signal. 585 // Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled 586 // with onNext. 587 sub1.requestNextElementOrEndOfStream(); 588 sub2.requestNextElementOrEndOfStream(); 589 try { 590 env.verifyNoAsyncErrors(); 591 } finally { 592 try { 593 sub1.cancel(); 594 } finally { 595 sub2.cancel(); 596 } 597 } 598 } 599 }); 600 } 601 602 @Override @Test 603 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 604 optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 605 @Override 606 public void run(Publisher<T> pub) throws InterruptedException { 607 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 608 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 609 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 610 611 sub1.request(1); 612 T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 613 sub2.request(2); 614 List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub)); 615 sub1.request(1); 616 T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 617 sub3.request(3); 618 List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub)); 619 sub3.request(1); 620 T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 621 sub3.request(1); 622 T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 623 sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub)); 624 sub2.request(3); 625 List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub)); 626 sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub)); 627 sub1.request(2); 628 List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub)); 629 sub1.request(1); 630 T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 631 sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub)); 632 633 @SuppressWarnings("unchecked") 634 List<T> r = new ArrayList<T>(Arrays.asList(x1, x2)); 635 r.addAll(x3); 636 r.addAll(Collections.singleton(x4)); 637 638 List<T> check1 = new ArrayList<T>(y1); 639 check1.addAll(y2); 640 641 //noinspection unchecked 642 List<T> check2 = new ArrayList<T>(z1); 643 check2.add(z2); 644 check2.add(z3); 645 646 assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub)); 647 assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub)); 648 } 649 }); 650 } 651 652 @Override @Test 653 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 654 optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements 655 @Override 656 public void run(Publisher<T> pub) throws Throwable { 657 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 658 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 659 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 660 661 List<T> received1 = new ArrayList<T>(); 662 List<T> received2 = new ArrayList<T>(); 663 List<T> received3 = new ArrayList<T>(); 664 665 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 666 // edgy edge case? 667 sub1.request(4); 668 sub2.request(4); 669 sub3.request(4); 670 671 received1.addAll(sub1.nextElements(3)); 672 received2.addAll(sub2.nextElements(3)); 673 received3.addAll(sub3.nextElements(3)); 674 675 // NOTE: can't check completion, the Publisher may not be able to signal it 676 // a similar test *with* completion checking is implemented 677 678 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 679 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 680 } 681 }); 682 } 683 684 @Override @Test 685 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 686 optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 687 @Override 688 public void run(Publisher<T> pub) throws Throwable { 689 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 690 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 691 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 692 693 List<T> received1 = new ArrayList<T>(); 694 List<T> received2 = new ArrayList<T>(); 695 List<T> received3 = new ArrayList<T>(); 696 697 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 698 // edgy edge case? 699 sub1.request(4); 700 sub2.request(4); 701 sub3.request(4); 702 703 received1.addAll(sub1.nextElements(3)); 704 received2.addAll(sub2.nextElements(3)); 705 received3.addAll(sub3.nextElements(3)); 706 707 sub1.expectCompletion(); 708 sub2.expectCompletion(); 709 sub3.expectCompletion(); 710 711 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 712 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 713 } 714 }); 715 } 716 717 ///////////////////// SUBSCRIPTION TESTS ////////////////////////////////// 718 719 @Override @Test 720 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 721 activePublisherTest(6, false, new PublisherTestRun<T>() { 722 @Override 723 public void run(Publisher<T> pub) throws Throwable { 724 ManualSubscriber<T> sub = new ManualSubscriber<T>(env) { 725 @Override 726 public void onSubscribe(Subscription subs) { 727 this.subscription.completeImmediatly(subs); 728 729 subs.request(1); 730 subs.request(1); 731 subs.request(1); 732 } 733 734 @Override 735 public void onNext(T element) { 736 Subscription subs = this.subscription.value(); 737 subs.request(1); 738 } 739 }; 740 741 env.subscribe(pub, sub); 742 743 env.verifyNoAsyncErrors(); 744 } 745 }); 746 } 747 748 @Override @Test 749 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 750 final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1; 751 752 activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() { 753 @Override 754 public void run(Publisher<T> pub) throws Throwable { 755 final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() { 756 @Override 757 protected Long initialValue() { 758 return 0L; 759 } 760 }; 761 762 final Latch runCompleted = new Latch(env); 763 764 final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 765 // counts the number of signals received, used to break out from possibly infinite request/onNext loops 766 long signalsReceived = 0L; 767 768 @Override 769 public void onNext(T element) { 770 // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements 771 // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing 772 773 signalsReceived += 1; 774 stackDepthCounter.set(stackDepthCounter.get() + 1); 775 env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element)); 776 777 final long callsUntilNow = stackDepthCounter.get(); 778 if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) { 779 env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d", 780 callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion())); 781 782 // stop the recursive call chain 783 runCompleted.close(); 784 return; 785 } else if (signalsReceived >= oneMoreThanBoundedLimit) { 786 // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and 787 // stop the recursive call chain 788 runCompleted.close(); 789 return; 790 } 791 792 // request more right away, the Publisher must break the recursion 793 subscription.value().request(1); 794 795 stackDepthCounter.set(stackDepthCounter.get() - 1); 796 } 797 798 @Override 799 public void onComplete() { 800 super.onComplete(); 801 runCompleted.close(); 802 } 803 804 @Override 805 public void onError(Throwable cause) { 806 super.onError(cause); 807 runCompleted.close(); 808 } 809 }; 810 811 try { 812 env.subscribe(pub, sub); 813 814 sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...` 815 816 final String msg = String.format("Unable to validate call stack depth safety, " + 817 "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion", 818 oneMoreThanBoundedLimit); 819 runCompleted.expectClose(env.defaultTimeoutMillis(), msg); 820 env.verifyNoAsyncErrorsNoDelay(); 821 } finally { 822 // since the request/onNext recursive calls may keep the publisher running "forever", 823 // we MUST cancel it manually before exiting this test case 824 sub.cancel(); 825 } 826 } 827 }); 828 } 829 830 @Override @Test 831 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 832 notVerified(); // cannot be meaningfully tested, or can it? 833 } 834 835 @Override @Test 836 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception { 837 notVerified(); // cannot be meaningfully tested, or can it? 838 } 839 840 @Override @Test 841 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 842 activePublisherTest(3, false, new PublisherTestRun<T>() { 843 @Override 844 public void run(Publisher<T> pub) throws Throwable { 845 846 // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the 847 // subscription once it's cancelled (as expected). 848 // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it. 849 ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 850 @Override 851 public void cancel() { 852 if (subscription.isCompleted()) { 853 subscription.value().cancel(); 854 } else { 855 env.flop("Cannot cancel a subscription before having received it"); 856 } 857 } 858 }; 859 860 env.subscribe(pub, sub); 861 862 sub.cancel(); 863 sub.request(1); 864 sub.request(1); 865 sub.request(1); 866 867 sub.expectNone(); 868 env.verifyNoAsyncErrorsNoDelay(); 869 } 870 }); 871 } 872 873 @Override @Test 874 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 875 activePublisherTest(1, false, new PublisherTestRun<T>() { 876 @Override 877 public void run(Publisher<T> pub) throws Throwable { 878 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 879 880 // leak the Subscription 881 final Subscription subs = sub.subscription.value(); 882 883 subs.cancel(); 884 subs.cancel(); 885 subs.cancel(); 886 887 sub.expectNone(); 888 env.verifyNoAsyncErrorsNoDelay(); 889 } 890 }); 891 } 892 893 @Override @Test 894 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 895 activePublisherTest(10, false, new PublisherTestRun<T>() { 896 @Override public void run(Publisher<T> pub) throws Throwable { 897 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 898 sub.request(0); 899 sub.expectError(IllegalArgumentException.class); 900 } 901 }); 902 } 903 904 @Override @Test 905 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 906 activePublisherTest(10, false, new PublisherTestRun<T>() { 907 @Override 908 public void run(Publisher<T> pub) throws Throwable { 909 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 910 final Random r = new Random(); 911 sub.request(-r.nextInt(Integer.MAX_VALUE) - 1); 912 // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem 913 sub.expectError(IllegalArgumentException.class); 914 } 915 }); 916 } 917 918 @Override @Test 919 public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable { 920 optionalActivePublisherTest(10, false, new PublisherTestRun<T>() { 921 @Override 922 public void run(Publisher<T> pub) throws Throwable { 923 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 924 final Random r = new Random(); 925 sub.request(-r.nextInt(Integer.MAX_VALUE) - 1); 926 // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem 927 sub.expectErrorWithMessage(IllegalArgumentException.class, Arrays.asList("3.9", "non-positive subscription request", "negative subscription request")); 928 } 929 }); 930 } 931 932 @Override @Test 933 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 934 // the publisher is able to signal more elements than the subscriber will be requesting in total 935 final int publisherElements = 20; 936 937 final int demand1 = 10; 938 final int demand2 = 5; 939 final int totalDemand = demand1 + demand2; 940 941 activePublisherTest(publisherElements, false, new PublisherTestRun<T>() { 942 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 943 public void run(Publisher<T> pub) throws Throwable { 944 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 945 946 sub.request(demand1); 947 sub.request(demand2); 948 949 /* 950 NOTE: The order of the nextElement/cancel calls below is very important (!) 951 952 If this ordering was reversed, given an asynchronous publisher, 953 the following scenario would be *legal* and would break this test: 954 955 > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous 956 > AsyncPublisher receives request(5) - demand is now 15 957 ! AsyncPublisher didn't emit any onNext yet (!) 958 > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example 959 ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data 960 ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher) 961 962 Which is why we must first expect an element, and then cancel, once the producing is "running". 963 */ 964 sub.nextElement(); 965 sub.cancel(); 966 967 int onNextsSignalled = 1; 968 969 boolean stillBeingSignalled; 970 do { 971 // put asyncError if onNext signal received 972 sub.expectNone(); 973 Throwable error = env.dropAsyncError(); 974 975 if (error == null) { 976 stillBeingSignalled = false; 977 } else { 978 onNextsSignalled += 1; 979 stillBeingSignalled = true; 980 } 981 982 // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw 983 assertTrue(onNextsSignalled <= totalDemand, 984 String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", 985 onNextsSignalled, totalDemand)); 986 987 } while (stillBeingSignalled); 988 } 989 }); 990 991 env.verifyNoAsyncErrorsNoDelay(); 992 } 993 994 @Override @Test 995 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 996 final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>(); 997 998 final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() { 999 @Override 1000 public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception { 1001 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1002 final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue); 1003 1004 sub.request(1); 1005 sub.nextElement(); 1006 sub.cancel(); 1007 1008 return ref; 1009 } 1010 }; 1011 1012 activePublisherTest(3, false, new PublisherTestRun<T>() { 1013 @Override 1014 public void run(Publisher<T> pub) throws Throwable { 1015 final WeakReference<ManualSubscriber<T>> ref = run.apply(pub); 1016 1017 // cancel may be run asynchronously so we add a sleep before running the GC 1018 // to "resolve" the race 1019 Thread.sleep(publisherReferenceGCTimeoutMillis); 1020 System.gc(); 1021 1022 if (!ref.equals(queue.remove(100))) { 1023 env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub)); 1024 } 1025 1026 env.verifyNoAsyncErrorsNoDelay(); 1027 } 1028 }); 1029 } 1030 1031 @Override @Test 1032 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 1033 final int totalElements = 3; 1034 1035 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 1036 @Override 1037 public void run(Publisher<T> pub) throws Throwable { 1038 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1039 sub.request(Long.MAX_VALUE); 1040 1041 sub.nextElements(totalElements); 1042 sub.expectCompletion(); 1043 1044 env.verifyNoAsyncErrorsNoDelay(); 1045 } 1046 }); 1047 } 1048 1049 @Override @Test 1050 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 1051 final int totalElements = 3; 1052 1053 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 1054 @Override 1055 public void run(Publisher<T> pub) throws Throwable { 1056 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1057 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2 1058 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1 1059 sub.request(1); // pending = Long.MAX_VALUE 1060 1061 sub.nextElements(totalElements); 1062 sub.expectCompletion(); 1063 1064 try { 1065 env.verifyNoAsyncErrorsNoDelay(); 1066 } finally { 1067 sub.cancel(); 1068 } 1069 1070 } 1071 }); 1072 } 1073 1074 @Override @Test 1075 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 1076 activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() { 1077 @Override public void run(Publisher<T> pub) throws Throwable { 1078 final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) { 1079 // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls, 1080 // so 10 is relatively high and safe even if arbitrarily chosen 1081 int callsCounter = 10; 1082 1083 @Override 1084 public void onNext(T element) { 1085 env.debug(String.format("%s::onNext(%s)", this, element)); 1086 if (subscription.isCompleted()) { 1087 if (callsCounter > 0) { 1088 subscription.value().request(Long.MAX_VALUE - 1); 1089 callsCounter--; 1090 } else { 1091 subscription.value().cancel(); 1092 } 1093 } else { 1094 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 1095 } 1096 } 1097 }; 1098 env.subscribe(pub, sub, env.defaultTimeoutMillis()); 1099 1100 // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)` 1101 // we're pretty sure to overflow from those 1102 sub.request(1); 1103 1104 // no onError should be signalled 1105 try { 1106 env.verifyNoAsyncErrors(); 1107 } finally { 1108 sub.cancel(); 1109 } 1110 } 1111 }); 1112 } 1113 1114 ///////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 1115 1116 ///////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 1117 1118 public interface PublisherTestRun<T> { 1119 public void run(Publisher<T> pub) throws Throwable; 1120 } 1121 1122 /** 1123 * Test for feature that SHOULD/MUST be implemented, using a live publisher. 1124 * 1125 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1126 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1127 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1128 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1129 */ 1130 public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1131 if (elements > maxElementsFromPublisher()) { 1132 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1133 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1134 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1135 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1136 } else { 1137 Publisher<T> pub = createPublisher(elements); 1138 body.run(pub); 1139 env.verifyNoAsyncErrorsNoDelay(); 1140 } 1141 } 1142 1143 /** 1144 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 1145 * 1146 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1147 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1148 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1149 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1150 */ 1151 public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1152 if (elements > maxElementsFromPublisher()) { 1153 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1154 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1155 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1156 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1157 } else { 1158 1159 final Publisher<T> pub = createPublisher(elements); 1160 final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement."; 1161 1162 try { 1163 potentiallyPendingTest(pub, body); 1164 } catch (Exception ex) { 1165 notVerified(skipMessage); 1166 } catch (AssertionError ex) { 1167 notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage()); 1168 } 1169 } 1170 } 1171 1172 public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE = 1173 "Skipping because no error state Publisher provided, and the test requires it. " + 1174 "Please implement PublisherVerification#createFailedPublisher to run this test."; 1175 1176 public static final String SKIPPING_OPTIONAL_TEST_FAILED = 1177 "Skipping, because provided Publisher does not pass this *additional* verification."; 1178 /** 1179 * Additional test for Publisher in error state 1180 */ 1181 public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable { 1182 potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE); 1183 } 1184 1185 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable { 1186 potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED); 1187 } 1188 1189 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable { 1190 if (pub != null) { 1191 body.run(pub); 1192 } else { 1193 throw new SkipException(message); 1194 } 1195 } 1196 1197 /** 1198 * Executes a given test body {@code n} times. 1199 * All the test runs must pass in order for the stochastic test to pass. 1200 */ 1201 public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable { 1202 if (skipStochasticTests()) { 1203 notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!"); 1204 } 1205 1206 for (int i = 0; i < n; i++) { 1207 body.apply(i); 1208 } 1209 } 1210 1211 public void notVerified() { 1212 throw new SkipException("Not verified by this TCK."); 1213 } 1214 1215 /** 1216 * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher}, 1217 * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data. 1218 * 1219 * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b> 1220 */ 1221 public long publisherUnableToSignalOnComplete() { 1222 return Long.MAX_VALUE; 1223 } 1224 1225 public void notVerified(String message) { 1226 throw new SkipException(message); 1227 } 1228 1229}