001package org.reactivestreams.tck; 002 003import org.reactivestreams.Publisher; 004import org.reactivestreams.Subscriber; 005import org.reactivestreams.Subscription; 006import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport; 007import org.reactivestreams.tck.TestEnvironment.Latch; 008import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 009import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 010import org.reactivestreams.tck.support.Function; 011import org.reactivestreams.tck.support.Optional; 012import org.reactivestreams.tck.support.PublisherVerificationRules; 013import org.testng.SkipException; 014import org.testng.annotations.BeforeMethod; 015import org.testng.annotations.Test; 016 017import java.lang.Override; 018import java.lang.ref.ReferenceQueue; 019import java.lang.ref.WeakReference; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collections; 023import java.util.List; 024import java.util.Random; 025import java.util.concurrent.atomic.AtomicReference; 026 027import static org.testng.Assert.assertEquals; 028import static org.testng.Assert.assertTrue; 029 030/** 031 * Provides tests for verifying {@code Publisher} specification rules. 032 * 033 * @see org.reactivestreams.Publisher 034 */ 035public abstract class PublisherVerification<T> implements PublisherVerificationRules { 036 037 private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS"; 038 private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L; 039 040 private final TestEnvironment env; 041 042 /** 043 * The amount of time after which a cancelled Subscriber reference should be dropped. 044 * See Rule 3.13 for details. 045 */ 046 private final long publisherReferenceGCTimeoutMillis; 047 048 /** 049 * Constructs a new verification class using the given env and configuration. 050 * 051 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 052 */ 053 public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 054 this.env = env; 055 this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis; 056 } 057 058 /** 059 * Constructs a new verification class using the given env and configuration. 060 * 061 * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}. 062 */ 063 public PublisherVerification(TestEnvironment env) { 064 this.env = env; 065 this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis(); 066 } 067 068 /** 069 * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present, 070 * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}). 071 * 072 * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 073 * 074 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 075 */ 076 public static long envPublisherReferenceGCTimeoutMillis() { 077 final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV); 078 if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS; 079 else try { 080 return Long.parseLong(envMillis); 081 } catch(NumberFormatException ex) { 082 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex); 083 } 084 } 085 086 /** 087 * This is the main method you must implement in your test incarnation. 088 * It must create a Publisher for a stream with exactly the given number of elements. 089 * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite. 090 */ 091 public abstract Publisher<T> createPublisher(long elements); 092 093 /** 094 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 095 * 096 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 097 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 098 * 099 * If you ignore these additional tests, return {@code null} from this method. 100 */ 101 public abstract Publisher<T> createFailedPublisher(); 102 103 104 /** 105 * Override and return lower value if your Publisher is only able to produce a known number of elements. 106 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 107 * 108 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 109 * 110 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 111 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 112 */ 113 public long maxElementsFromPublisher() { 114 return Long.MAX_VALUE - 1; 115 } 116 117 /** 118 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 119 * Such tests MAY sometimes fail even though the impl 120 */ 121 public boolean skipStochasticTests() { 122 return false; 123 } 124 125 /** 126 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 127 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 128 * recursive calls to exceed the number returned by this method. 129 * 130 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 131 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 132 */ 133 public long boundedDepthOfOnNextAndRequestRecursion() { 134 return 1; 135 } 136 137 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 138 139 @BeforeMethod 140 public void setUp() throws Exception { 141 env.clearAsyncErrors(); 142 } 143 144 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 145 146 @Override @Test 147 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 148 activePublisherTest(1, true, new PublisherTestRun<T>() { 149 @Override 150 public void run(Publisher<T> pub) throws InterruptedException { 151 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 152 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 153 sub.requestEndOfStream(); 154 } 155 156 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 157 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 158 } 159 160 }); 161 } 162 163 @Override @Test 164 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 165 activePublisherTest(3, true, new PublisherTestRun<T>() { 166 @Override 167 public void run(Publisher<T> pub) throws InterruptedException { 168 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 169 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 170 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub)); 171 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub)); 172 sub.requestEndOfStream(); 173 } 174 175 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 176 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 177 } 178 179 }); 180 } 181 182 @Override @Test 183 public void required_validate_maxElementsFromPublisher() throws Exception { 184 assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0"); 185 } 186 187 @Override @Test 188 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 189 assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1"); 190 } 191 192 193 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 194 195 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.1 196 @Override @Test 197 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 198 activePublisherTest(5, false, new PublisherTestRun<T>() { 199 @Override 200 public void run(Publisher<T> pub) throws InterruptedException { 201 202 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 203 204 sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); 205 sub.request(1); 206 sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); 207 sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); 208 209 sub.request(1); 210 sub.request(2); 211 sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); 212 213 sub.expectNone(String.format("Publisher %sproduced unrequested ", pub)); 214 } 215 }); 216 } 217 218 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.2 219 @Override @Test 220 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 221 final int elements = 3; 222 final int requested = 10; 223 224 activePublisherTest(elements, true, new PublisherTestRun<T>() { 225 @Override 226 public void run(Publisher<T> pub) throws Throwable { 227 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 228 sub.request(requested); 229 sub.nextElements(elements); 230 sub.expectCompletion(); 231 } 232 }); 233 } 234 235 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.3 236 @Override @Test 237 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 238 final int iterations = 100; 239 final int elements = 10; 240 241 stochasticTest(iterations, new Function<Integer, Void>() { 242 @Override 243 public Void apply(final Integer runNumber) throws Throwable { 244 activePublisherTest(elements, true, new PublisherTestRun<T>() { 245 @Override 246 public void run(Publisher<T> pub) throws Throwable { 247 final Latch completionLatch = new Latch(env); 248 249 pub.subscribe(new Subscriber<T>() { 250 private Subscription subs; 251 private long gotElements = 0; 252 253 private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier(); 254 255 /** 256 * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect 257 * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness. 258 * 259 * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call 260 * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough 261 * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread 262 * to enter subsequent critical sections ("nesting" them effectively). 263 */ 264 final class ConcurrentAccessBarrier { 265 private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null); 266 private volatile String previousSignal = null; 267 268 public void enterSignal(String signalName) { 269 if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) { 270 env.flop(String.format( 271 "Illegal concurrent access detected (entering critical section)! " + 272 "%s emited %s signal, before %s finished its %s signal.", 273 Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal)); 274 } 275 this.previousSignal = signalName; 276 } 277 278 public void leaveSignal(String signalName) { 279 currentlySignallingThread.set(null); 280 this.previousSignal = signalName; 281 } 282 283 private boolean isSynchronousSignal() { 284 return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get()); 285 } 286 287 } 288 289 @Override 290 public void onSubscribe(Subscription s) { 291 final String signal = "onSubscribe()"; 292 concurrentAccessBarrier.enterSignal(signal); 293 294 subs = s; 295 subs.request(1); 296 297 concurrentAccessBarrier.leaveSignal(signal); 298 } 299 300 @Override 301 public void onNext(T ignore) { 302 final String signal = String.format("onNext(%s)", ignore); 303 concurrentAccessBarrier.enterSignal(signal); 304 305 gotElements += 1; 306 if (gotElements <= elements) // requesting one more than we know are in the stream (some Publishers need this) 307 subs.request(1); 308 309 concurrentAccessBarrier.leaveSignal(signal); 310 } 311 312 @Override 313 public void onError(Throwable t) { 314 final String signal = String.format("onError(%s)", t.getMessage()); 315 concurrentAccessBarrier.enterSignal(signal); 316 317 // ignore value 318 319 concurrentAccessBarrier.leaveSignal(signal); 320 } 321 322 @Override 323 public void onComplete() { 324 final String signal = "onComplete()"; 325 concurrentAccessBarrier.enterSignal(signal); 326 327 // entering for completeness 328 329 concurrentAccessBarrier.leaveSignal(signal); 330 completionLatch.close(); 331 } 332 }); 333 334 completionLatch.expectClose(elements * env.defaultTimeoutMillis(), "Expected 10 elements to be drained"); 335 } 336 }); 337 return null; 338 } 339 }); 340 } 341 342 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.4 343 @Override @Test 344 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 345 try { 346 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 347 @Override 348 public void run(final Publisher<T> pub) throws InterruptedException { 349 final Latch onErrorlatch = new Latch(env); 350 final Latch onSubscribeLatch = new Latch(env); 351 pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) { 352 @Override 353 public void onSubscribe(Subscription subs) { 354 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 355 onSubscribeLatch.close(); 356 } 357 @Override 358 public void onError(Throwable cause) { 359 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 360 onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); 361 onErrorlatch.close(); 362 } 363 }); 364 365 onSubscribeLatch.expectClose("Should have received onSubscribe"); 366 onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); 367 368 env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); 369 } 370 }); 371 } catch (SkipException se) { 372 throw se; 373 } catch (Throwable ex) { 374 // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe 375 // which was wrong of him - he should have signalled on error using onError 376 throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex); 377 } 378 } 379 380 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.5 381 @Override @Test 382 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 383 activePublisherTest(3, true, new PublisherTestRun<T>() { 384 @Override 385 public void run(Publisher<T> pub) throws Throwable { 386 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 387 sub.requestNextElement(); 388 sub.requestNextElement(); 389 sub.requestNextElement(); 390 sub.requestEndOfStream(); 391 sub.expectNone(); 392 } 393 }); 394 } 395 396 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.5 397 @Override @Test 398 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 399 optionalActivePublisherTest(0, true, new PublisherTestRun<T>() { 400 @Override 401 public void run(Publisher<T> pub) throws Throwable { 402 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 403 sub.request(1); 404 sub.expectCompletion(); 405 sub.expectNone(); 406 } 407 }); 408 } 409 410 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.6 411 @Override @Test 412 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 413 notVerified(); // not really testable without more control over the Publisher 414 } 415 416 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.7 417 @Override @Test 418 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 419 activePublisherTest(1, true, new PublisherTestRun<T>() { 420 @Override 421 public void run(Publisher<T> pub) throws Throwable { 422 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 423 sub.request(10); 424 sub.nextElement(); 425 sub.expectCompletion(); 426 427 sub.request(10); 428 sub.expectNone(); 429 } 430 }); 431 } 432 433 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.7 434 @Override @Test 435 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 436 notVerified(); // can we meaningfully test this, without more control over the publisher? 437 } 438 439 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.8 440 @Override @Test 441 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 442 notVerified(); // can we meaningfully test this? 443 } 444 445 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9 446 @Override @Test 447 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 448 notVerified(); // can we meaningfully test this? 449 } 450 451 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9 452 @Override @Test 453 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 454 activePublisherTest(0, false, new PublisherTestRun<T>() { 455 @Override 456 public void run(Publisher<T> pub) throws Throwable { 457 try { 458 pub.subscribe(null); 459 env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe"); 460 } catch (NullPointerException ignored) { 461 // valid behaviour 462 } 463 env.verifyNoAsyncErrorsNoDelay(); 464 } 465 }); 466 } 467 468 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9 469 @Override @Test 470 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 471 activePublisherTest(0, false, new PublisherTestRun<T>() { 472 @Override 473 public void run(Publisher<T> pub) throws Throwable { 474 final Latch onSubscribeLatch = new Latch(env); 475 pub.subscribe(new Subscriber<T>() { 476 @Override 477 public void onError(Throwable cause) { 478 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 479 } 480 481 @Override 482 public void onSubscribe(Subscription subs) { 483 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 484 onSubscribeLatch.close(); 485 } 486 487 @Override 488 public void onNext(T elem) { 489 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); 490 } 491 492 @Override 493 public void onComplete() { 494 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); 495 } 496 }); 497 onSubscribeLatch.expectClose("Should have received onSubscribe"); 498 env.verifyNoAsyncErrorsNoDelay(); 499 } 500 }); 501 } 502 503 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9 504 @Override @Test 505 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 506 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 507 @Override 508 public void run(Publisher<T> pub) throws Throwable { 509 final Latch onErrorLatch = new Latch(env); 510 final Latch onSubscribeLatch = new Latch(env); 511 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 512 @Override 513 public void onError(Throwable cause) { 514 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 515 onErrorLatch.assertOpen("Only one onError call expected"); 516 onErrorLatch.close(); 517 } 518 519 @Override 520 public void onSubscribe(Subscription subs) { 521 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 522 onSubscribeLatch.close(); 523 } 524 }; 525 pub.subscribe(sub); 526 onSubscribeLatch.expectClose("Should have received onSubscribe"); 527 onErrorLatch.expectClose("Should have received onError"); 528 529 env.verifyNoAsyncErrorsNoDelay(); 530 } 531 }); 532 } 533 534 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.10 535 @Override @Test 536 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 537 notVerified(); // can we meaningfully test this? 538 } 539 540 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 541 @Override @Test 542 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 543 optionalActivePublisherTest(1, false, new PublisherTestRun<T>() { 544 @Override 545 public void run(Publisher<T> pub) throws Throwable { 546 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 547 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 548 549 env.verifyNoAsyncErrors(); 550 } 551 }); 552 } 553 554 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 555 @Override @Test 556 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 557 optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 558 @Override 559 public void run(Publisher<T> pub) throws InterruptedException { 560 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 561 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 562 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 563 564 sub1.request(1); 565 T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 566 sub2.request(2); 567 List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub)); 568 sub1.request(1); 569 T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 570 sub3.request(3); 571 List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub)); 572 sub3.request(1); 573 T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 574 sub3.request(1); 575 T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 576 sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub)); 577 sub2.request(3); 578 List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub)); 579 sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub)); 580 sub1.request(2); 581 List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub)); 582 sub1.request(1); 583 T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 584 sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub)); 585 586 @SuppressWarnings("unchecked") 587 List<T> r = new ArrayList<T>(Arrays.asList(x1, x2)); 588 r.addAll(x3); 589 r.addAll(Collections.singleton(x4)); 590 591 List<T> check1 = new ArrayList<T>(y1); 592 check1.addAll(y2); 593 594 //noinspection unchecked 595 List<T> check2 = new ArrayList<T>(z1); 596 check2.add(z2); 597 check2.add(z3); 598 599 assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub)); 600 assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub)); 601 } 602 }); 603 } 604 605 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 606 @Override @Test 607 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 608 optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements 609 @Override 610 public void run(Publisher<T> pub) throws Throwable { 611 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 612 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 613 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 614 615 List<T> received1 = new ArrayList<T>(); 616 List<T> received2 = new ArrayList<T>(); 617 List<T> received3 = new ArrayList<T>(); 618 619 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 620 // edgy edge case? 621 sub1.request(4); 622 sub2.request(4); 623 sub3.request(4); 624 625 received1.addAll(sub1.nextElements(3)); 626 received2.addAll(sub2.nextElements(3)); 627 received3.addAll(sub3.nextElements(3)); 628 629 // NOTE: can't check completion, the Publisher may not be able to signal it 630 // a similar test *with* completion checking is implemented 631 632 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 633 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 634 } 635 }); 636 } 637 638 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 639 @Override @Test 640 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 641 optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 642 @Override 643 public void run(Publisher<T> pub) throws Throwable { 644 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 645 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 646 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 647 648 List<T> received1 = new ArrayList<T>(); 649 List<T> received2 = new ArrayList<T>(); 650 List<T> received3 = new ArrayList<T>(); 651 652 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 653 // edgy edge case? 654 sub1.request(4); 655 sub2.request(4); 656 sub3.request(4); 657 658 received1.addAll(sub1.nextElements(3)); 659 received2.addAll(sub2.nextElements(3)); 660 received3.addAll(sub3.nextElements(3)); 661 662 sub1.expectCompletion(); 663 sub2.expectCompletion(); 664 sub3.expectCompletion(); 665 666 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 667 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 668 } 669 }); 670 } 671 672 ///////////////////// SUBSCRIPTION TESTS ////////////////////////////////// 673 674 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.2 675 @Override @Test 676 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 677 activePublisherTest(6, false, new PublisherTestRun<T>() { 678 @Override 679 public void run(Publisher<T> pub) throws Throwable { 680 ManualSubscriber<T> sub = new ManualSubscriber<T>(env) { 681 @Override 682 public void onSubscribe(Subscription subs) { 683 this.subscription.completeImmediatly(subs); 684 685 subs.request(1); 686 subs.request(1); 687 subs.request(1); 688 } 689 690 @Override 691 public void onNext(T element) { 692 Subscription subs = this.subscription.value(); 693 subs.request(1); 694 } 695 }; 696 697 env.subscribe(pub, sub); 698 699 long delay = env.defaultTimeoutMillis(); 700 env.verifyNoAsyncErrors(delay); 701 } 702 }); 703 } 704 705 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.3 706 @Override @Test 707 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 708 final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1; 709 710 activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() { 711 @Override 712 public void run(Publisher<T> pub) throws Throwable { 713 final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() { 714 @Override 715 protected Long initialValue() { 716 return 0L; 717 } 718 }; 719 720 final Latch runCompleted = new Latch(env); 721 722 final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 723 // counts the number of signals received, used to break out from possibly infinite request/onNext loops 724 long signalsReceived = 0L; 725 726 @Override 727 public void onNext(T element) { 728 // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements 729 // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing 730 731 signalsReceived += 1; 732 stackDepthCounter.set(stackDepthCounter.get() + 1); 733 env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element)); 734 735 final long callsUntilNow = stackDepthCounter.get(); 736 if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) { 737 env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d", 738 callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion())); 739 740 // stop the recursive call chain 741 runCompleted.close(); 742 return; 743 } else if (signalsReceived >= oneMoreThanBoundedLimit) { 744 // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and 745 // stop the recursive call chain 746 runCompleted.close(); 747 return; 748 } 749 750 // request more right away, the Publisher must break the recursion 751 subscription.value().request(1); 752 753 stackDepthCounter.set(stackDepthCounter.get() - 1); 754 } 755 756 @Override 757 public void onComplete() { 758 super.onComplete(); 759 runCompleted.close(); 760 } 761 762 @Override 763 public void onError(Throwable cause) { 764 super.onError(cause); 765 runCompleted.close(); 766 } 767 }; 768 769 try { 770 env.subscribe(pub, sub); 771 772 sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...` 773 774 final String msg = String.format("Unable to validate call stack depth safety, " + 775 "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion", 776 oneMoreThanBoundedLimit); 777 runCompleted.expectClose(env.defaultTimeoutMillis(), msg); 778 env.verifyNoAsyncErrorsNoDelay(); 779 } finally { 780 // since the request/onNext recursive calls may keep the publisher running "forever", 781 // we MUST cancel it manually before exiting this test case 782 sub.cancel(); 783 } 784 } 785 }); 786 } 787 788 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.4 789 @Override @Test 790 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 791 notVerified(); // cannot be meaningfully tested, or can it? 792 } 793 794 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.5 795 @Override @Test 796 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() throws Exception { 797 notVerified(); // cannot be meaningfully tested, or can it? 798 } 799 800 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.6 801 @Override @Test 802 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 803 activePublisherTest(3, false, new PublisherTestRun<T>() { 804 @Override 805 public void run(Publisher<T> pub) throws Throwable { 806 807 // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the 808 // subscription once it's cancelled (as expected). 809 // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it. 810 ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 811 @Override 812 public void cancel() { 813 if (subscription.isCompleted()) { 814 subscription.value().cancel(); 815 } else { 816 env.flop("Cannot cancel a subscription before having received it"); 817 } 818 } 819 }; 820 821 env.subscribe(pub, sub); 822 823 sub.cancel(); 824 sub.request(1); 825 sub.request(1); 826 sub.request(1); 827 828 sub.expectNone(); 829 env.verifyNoAsyncErrorsNoDelay(); 830 } 831 }); 832 } 833 834 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.7 835 @Override @Test 836 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 837 activePublisherTest(1, false, new PublisherTestRun<T>() { 838 @Override 839 public void run(Publisher<T> pub) throws Throwable { 840 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 841 842 // leak the Subscription 843 final Subscription subs = sub.subscription.value(); 844 845 subs.cancel(); 846 subs.cancel(); 847 subs.cancel(); 848 849 sub.expectNone(); 850 env.verifyNoAsyncErrorsNoDelay(); 851 } 852 }); 853 } 854 855 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.9 856 @Override @Test 857 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 858 activePublisherTest(10, false, new PublisherTestRun<T>() { 859 @Override public void run(Publisher<T> pub) throws Throwable { 860 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 861 sub.request(0); 862 sub.expectErrorWithMessage(IllegalArgumentException.class, "3.9"); // we do require implementations to mention the rule number at the very least 863 } 864 }); 865 } 866 867 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.9 868 @Override @Test 869 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 870 activePublisherTest(10, false, new PublisherTestRun<T>() { 871 @Override 872 public void run(Publisher<T> pub) throws Throwable { 873 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 874 final Random r = new Random(); 875 sub.request(-r.nextInt(Integer.MAX_VALUE)); 876 sub.expectErrorWithMessage(IllegalArgumentException.class, "3.9"); // we do require implementations to mention the rule number at the very least 877 } 878 }); 879 } 880 881 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.12 882 @Override @Test 883 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 884 // the publisher is able to signal more elements than the subscriber will be requesting in total 885 final int publisherElements = 20; 886 887 final int demand1 = 10; 888 final int demand2 = 5; 889 final int totalDemand = demand1 + demand2; 890 891 activePublisherTest(publisherElements, false, new PublisherTestRun<T>() { 892 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 893 public void run(Publisher<T> pub) throws Throwable { 894 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 895 896 sub.request(demand1); 897 sub.request(demand2); 898 899 /* 900 NOTE: The order of the nextElement/cancel calls below is very important (!) 901 902 If this ordering was reversed, given an asynchronous publisher, 903 the following scenario would be *legal* and would break this test: 904 905 > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous 906 > AsyncPublisher receives request(5) - demand is now 15 907 ! AsyncPublisher didn't emit any onNext yet (!) 908 > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example 909 ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data 910 ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher) 911 912 Which is why we must first expect an element, and then cancel, once the producing is "running". 913 */ 914 sub.nextElement(); 915 sub.cancel(); 916 917 int onNextsSignalled = 1; 918 919 boolean stillBeingSignalled; 920 do { 921 // put asyncError if onNext signal received 922 sub.expectNone(); 923 Throwable error = env.dropAsyncError(); 924 925 if (error == null) { 926 stillBeingSignalled = false; 927 } else { 928 onNextsSignalled += 1; 929 stillBeingSignalled = true; 930 } 931 932 // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw 933 assertTrue(onNextsSignalled <= totalDemand, 934 String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", 935 onNextsSignalled, totalDemand)); 936 937 } while (stillBeingSignalled); 938 } 939 }); 940 941 env.verifyNoAsyncErrorsNoDelay(); 942 } 943 944 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.13 945 @Override @Test 946 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 947 final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>(); 948 949 final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() { 950 @Override 951 public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception { 952 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 953 final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue); 954 955 sub.request(1); 956 sub.nextElement(); 957 sub.cancel(); 958 959 return ref; 960 } 961 }; 962 963 activePublisherTest(3, false, new PublisherTestRun<T>() { 964 @Override 965 public void run(Publisher<T> pub) throws Throwable { 966 final WeakReference<ManualSubscriber<T>> ref = run.apply(pub); 967 968 // cancel may be run asynchronously so we add a sleep before running the GC 969 // to "resolve" the race 970 Thread.sleep(publisherReferenceGCTimeoutMillis); 971 System.gc(); 972 973 if (!ref.equals(queue.remove(100))) { 974 env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub)); 975 } 976 977 env.verifyNoAsyncErrorsNoDelay(); 978 } 979 }); 980 } 981 982 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.17 983 @Override @Test 984 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 985 final int totalElements = 3; 986 987 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 988 @Override 989 public void run(Publisher<T> pub) throws Throwable { 990 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 991 sub.request(Long.MAX_VALUE); 992 993 sub.nextElements(totalElements); 994 sub.expectCompletion(); 995 996 env.verifyNoAsyncErrorsNoDelay(); 997 } 998 }); 999 } 1000 1001 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.17 1002 @Override @Test 1003 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 1004 final int totalElements = 3; 1005 1006 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 1007 @Override 1008 public void run(Publisher<T> pub) throws Throwable { 1009 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1010 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2 1011 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1 1012 sub.request(1); // pending = Long.MAX_VALUE 1013 1014 sub.nextElements(totalElements); 1015 sub.expectCompletion(); 1016 1017 try { 1018 env.verifyNoAsyncErrorsNoDelay(); 1019 } finally { 1020 sub.cancel(); 1021 } 1022 1023 } 1024 }); 1025 } 1026 1027 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.17 1028 @Override @Test 1029 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 1030 activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() { 1031 @Override public void run(Publisher<T> pub) throws Throwable { 1032 final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) { 1033 // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls, 1034 // so 10 is relatively high and safe even if arbitrarily chosen 1035 int callsCounter = 10; 1036 1037 @Override 1038 public void onNext(T element) { 1039 env.debug(String.format("%s::onNext(%s)", this, element)); 1040 if (subscription.isCompleted()) { 1041 if (callsCounter > 0) { 1042 subscription.value().request(Long.MAX_VALUE - 1); 1043 callsCounter--; 1044 } else { 1045 subscription.value().cancel(); 1046 } 1047 } else { 1048 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 1049 } 1050 } 1051 }; 1052 env.subscribe(pub, sub, env.defaultTimeoutMillis()); 1053 1054 // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)` 1055 // we're pretty sure to overflow from those 1056 sub.request(1); 1057 1058 // no onError should be signalled 1059 try { 1060 env.verifyNoAsyncErrors(env.defaultTimeoutMillis()); 1061 } finally { 1062 sub.cancel(); 1063 } 1064 } 1065 }); 1066 } 1067 1068 ///////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 1069 1070 ///////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 1071 1072 public interface PublisherTestRun<T> { 1073 public void run(Publisher<T> pub) throws Throwable; 1074 } 1075 1076 /** 1077 * Test for feature that SHOULD/MUST be implemented, using a live publisher. 1078 * 1079 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1080 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1081 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1082 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1083 */ 1084 public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1085 if (elements > maxElementsFromPublisher()) { 1086 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1087 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1088 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1089 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1090 } else { 1091 Publisher<T> pub = createPublisher(elements); 1092 body.run(pub); 1093 env.verifyNoAsyncErrorsNoDelay(); 1094 } 1095 } 1096 1097 /** 1098 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 1099 * 1100 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1101 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1102 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1103 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1104 */ 1105 public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1106 if (elements > maxElementsFromPublisher()) { 1107 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1108 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1109 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1110 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1111 } else { 1112 1113 final Publisher<T> pub = createPublisher(elements); 1114 final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement."; 1115 1116 try { 1117 potentiallyPendingTest(pub, body); 1118 } catch (Exception ex) { 1119 notVerified(skipMessage); 1120 } catch (AssertionError ex) { 1121 notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage()); 1122 } 1123 } 1124 } 1125 1126 public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE = 1127 "Skipping because no error state Publisher provided, and the test requires it. " + 1128 "Please implement PublisherVerification#createFailedPublisher to run this test."; 1129 1130 public static final String SKIPPING_OPTIONAL_TEST_FAILED = 1131 "Skipping, because provided Publisher does not pass this *additional* verification."; 1132 /** 1133 * Additional test for Publisher in error state 1134 */ 1135 public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable { 1136 potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE); 1137 } 1138 1139 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable { 1140 potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED); 1141 } 1142 1143 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable { 1144 if (pub != null) { 1145 body.run(pub); 1146 } else { 1147 throw new SkipException(message); 1148 } 1149 } 1150 1151 /** 1152 * Executes a given test body {@code n} times. 1153 * All the test runs must pass in order for the stochastic test to pass. 1154 */ 1155 public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable { 1156 if (skipStochasticTests()) { 1157 notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!"); 1158 } 1159 1160 for (int i = 0; i < n; i++) { 1161 body.apply(i); 1162 } 1163 } 1164 1165 public void notVerified() { 1166 throw new SkipException("Not verified by this TCK."); 1167 } 1168 1169 /** 1170 * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher}, 1171 * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data. 1172 * 1173 * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b> 1174 */ 1175 public long publisherUnableToSignalOnComplete() { 1176 return Long.MAX_VALUE; 1177 } 1178 1179 public void notVerified(String message) { 1180 throw new SkipException(message); 1181 } 1182 1183}