001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.TestEnvironment.BlackholeSubscriberWithSubscriptionSupport; 018import org.reactivestreams.tck.TestEnvironment.Latch; 019import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 020import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; 021import org.reactivestreams.tck.flow.support.Function; 022import org.reactivestreams.tck.flow.support.Optional; 023import org.reactivestreams.tck.flow.support.PublisherVerificationRules; 024import org.testng.SkipException; 025import org.testng.annotations.BeforeMethod; 026import org.testng.annotations.Test; 027 028import java.lang.Override; 029import java.lang.ref.ReferenceQueue; 030import java.lang.ref.WeakReference; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Random; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicReference; 038 039import static org.testng.Assert.assertEquals; 040import static org.testng.Assert.assertTrue; 041 042/** 043 * Provides tests for verifying {@code Publisher} specification rules. 044 * 045 * @see org.reactivestreams.Publisher 046 */ 047public abstract class PublisherVerification<T> implements PublisherVerificationRules { 048 049 private static final String PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV = "PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS"; 050 private static final long DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS = 300L; 051 052 private final TestEnvironment env; 053 054 /** 055 * The amount of time after which a cancelled Subscriber reference should be dropped. 056 * See Rule 3.13 for details. 057 */ 058 private final long publisherReferenceGCTimeoutMillis; 059 060 /** 061 * Constructs a new verification class using the given env and configuration. 062 * 063 * @param publisherReferenceGCTimeoutMillis used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 064 */ 065 public PublisherVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) { 066 this.env = env; 067 this.publisherReferenceGCTimeoutMillis = publisherReferenceGCTimeoutMillis; 068 } 069 070 /** 071 * Constructs a new verification class using the given env and configuration. 072 * 073 * The value for {@code publisherReferenceGCTimeoutMillis} will be obtained by using {@link PublisherVerification#envPublisherReferenceGCTimeoutMillis()}. 074 */ 075 public PublisherVerification(TestEnvironment env) { 076 this.env = env; 077 this.publisherReferenceGCTimeoutMillis = envPublisherReferenceGCTimeoutMillis(); 078 } 079 080 /** 081 * Tries to parse the env variable {@code PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS} as long and returns the value if present, 082 * OR its default value ({@link PublisherVerification#DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS}). 083 * 084 * This value is used to determine after how much time a reference to a Subscriber should be already dropped by the Publisher. 085 * 086 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 087 */ 088 public static long envPublisherReferenceGCTimeoutMillis() { 089 final String envMillis = System.getenv(PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV); 090 if (envMillis == null) return DEFAULT_PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS; 091 else try { 092 return Long.parseLong(envMillis); 093 } catch (NumberFormatException ex) { 094 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS_ENV, envMillis), ex); 095 } 096 } 097 098 /** 099 * This is the main method you must implement in your test incarnation. 100 * It must create a Publisher for a stream with exactly the given number of elements. 101 * If `elements` is `Long.MAX_VALUE` the produced stream must be infinite. 102 */ 103 public abstract Publisher<T> createPublisher(long elements); 104 105 /** 106 * By implementing this method, additional TCK tests concerning a "failed" publishers will be run. 107 * 108 * The expected behaviour of the {@link Publisher} returned by this method is hand out a subscription, 109 * followed by signalling {@code onError} on it, as specified by Rule 1.9. 110 * 111 * If you ignore these additional tests, return {@code null} from this method. 112 */ 113 public abstract Publisher<T> createFailedPublisher(); 114 115 116 /** 117 * Override and return lower value if your Publisher is only able to produce a known number of elements. 118 * For example, if it is designed to return at-most-one element, return {@code 1} from this method. 119 * 120 * Defaults to {@code Long.MAX_VALUE - 1}, meaning that the Publisher can be produce a huge but NOT an unbounded number of elements. 121 * 122 * To mark your Publisher will *never* signal an {@code onComplete} override this method and return {@code Long.MAX_VALUE}, 123 * which will result in *skipping all tests which require an onComplete to be triggered* (!). 124 */ 125 public long maxElementsFromPublisher() { 126 return Long.MAX_VALUE - 1; 127 } 128 129 /** 130 * Override and return {@code true} in order to skip executing tests marked as {@code Stochastic}. 131 * Stochastic in this case means that the Rule is impossible or infeasible to deterministically verify— 132 * usually this means that this test case can yield false positives ("be green") even if for some case, 133 * the given implementation may violate the tested behaviour. 134 */ 135 public boolean skipStochasticTests() { 136 return false; 137 } 138 139 /** 140 * In order to verify rule 3.3 of the reactive streams spec, this number will be used to check if a 141 * {@code Subscription} actually solves the "unbounded recursion" problem by not allowing the number of 142 * recursive calls to exceed the number returned by this method. 143 * 144 * @see <a href="https://github.com/reactive-streams/reactive-streams-jvm#3.3">reactive streams spec, rule 3.3</a> 145 * @see PublisherVerification#required_spec303_mustNotAllowUnboundedRecursion() 146 */ 147 public long boundedDepthOfOnNextAndRequestRecursion() { 148 return 1; 149 } 150 151 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 152 153 @BeforeMethod 154 public void setUp() throws Exception { 155 env.clearAsyncErrors(); 156 } 157 158 ////////////////////// TEST SETUP VERIFICATION ////////////////////////////// 159 160 @Override @Test 161 public void required_createPublisher1MustProduceAStreamOfExactly1Element() throws Throwable { 162 activePublisherTest(1, true, new PublisherTestRun<T>() { 163 @Override 164 public void run(Publisher<T> pub) throws InterruptedException { 165 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 166 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 167 sub.requestEndOfStream(); 168 } 169 170 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 171 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 172 } 173 174 }); 175 } 176 177 @Override @Test 178 public void required_createPublisher3MustProduceAStreamOfExactly3Elements() throws Throwable { 179 activePublisherTest(3, true, new PublisherTestRun<T>() { 180 @Override 181 public void run(Publisher<T> pub) throws InterruptedException { 182 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 183 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced no elements", pub)); 184 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 1 element", pub)); 185 assertTrue(requestNextElementOrEndOfStream(pub, sub).isDefined(), String.format("Publisher %s produced only 2 elements", pub)); 186 sub.requestEndOfStream(); 187 } 188 189 Optional<T> requestNextElementOrEndOfStream(Publisher<T> pub, ManualSubscriber<T> sub) throws InterruptedException { 190 return sub.requestNextElementOrEndOfStream(String.format("Timeout while waiting for next element from Publisher %s", pub)); 191 } 192 193 }); 194 } 195 196 @Override @Test 197 public void required_validate_maxElementsFromPublisher() throws Exception { 198 assertTrue(maxElementsFromPublisher() >= 0, "maxElementsFromPublisher MUST return a number >= 0"); 199 } 200 201 @Override @Test 202 public void required_validate_boundedDepthOfOnNextAndRequestRecursion() throws Exception { 203 assertTrue(boundedDepthOfOnNextAndRequestRecursion() >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1"); 204 } 205 206 207 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 208 209 @Override @Test 210 public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() throws Throwable { 211 activePublisherTest(5, false, new PublisherTestRun<T>() { 212 @Override 213 public void run(Publisher<T> pub) throws InterruptedException { 214 215 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 216 try { 217 sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub)); 218 sub.request(1); 219 sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub)); 220 sub.expectNone(String.format("Publisher %s produced unrequested: ", pub)); 221 222 sub.request(1); 223 sub.request(2); 224 sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub)); 225 226 sub.expectNone(String.format("Publisher %sproduced unrequested ", pub)); 227 } finally { 228 sub.cancel(); 229 } 230 } 231 }); 232 } 233 234 @Override @Test 235 public void required_spec102_maySignalLessThanRequestedAndTerminateSubscription() throws Throwable { 236 final int elements = 3; 237 final int requested = 10; 238 239 activePublisherTest(elements, true, new PublisherTestRun<T>() { 240 @Override 241 public void run(Publisher<T> pub) throws Throwable { 242 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 243 sub.request(requested); 244 sub.nextElements(elements); 245 sub.expectCompletion(); 246 } 247 }); 248 } 249 250 @Override @Test 251 public void stochastic_spec103_mustSignalOnMethodsSequentially() throws Throwable { 252 final int iterations = 100; 253 final int elements = 10; 254 255 stochasticTest(iterations, new Function<Integer, Void>() { 256 @Override 257 public Void apply(final Integer runNumber) throws Throwable { 258 activePublisherTest(elements, true, new PublisherTestRun<T>() { 259 @Override 260 public void run(Publisher<T> pub) throws Throwable { 261 final Latch completionLatch = new Latch(env); 262 263 final AtomicInteger gotElements = new AtomicInteger(0); 264 pub.subscribe(new Subscriber<T>() { 265 private Subscription subs; 266 267 private ConcurrentAccessBarrier concurrentAccessBarrier = new ConcurrentAccessBarrier(); 268 269 /** 270 * Concept wise very similar to a {@link org.reactivestreams.tck.TestEnvironment.Latch}, serves to protect 271 * a critical section from concurrent access, with the added benefit of Thread tracking and same-thread-access awareness. 272 * 273 * Since a <i>Synchronous</i> Publisher may choose to synchronously (using the same {@link Thread}) call 274 * {@code onNext} directly from either {@code subscribe} or {@code request} a plain Latch is not enough 275 * to verify concurrent access safety - one needs to track if the caller is not still using the calling thread 276 * to enter subsequent critical sections ("nesting" them effectively). 277 */ 278 final class ConcurrentAccessBarrier { 279 private AtomicReference<Thread> currentlySignallingThread = new AtomicReference<Thread>(null); 280 private volatile String previousSignal = null; 281 282 public void enterSignal(String signalName) { 283 if((!currentlySignallingThread.compareAndSet(null, Thread.currentThread())) && !isSynchronousSignal()) { 284 env.flop(String.format( 285 "Illegal concurrent access detected (entering critical section)! " + 286 "%s emited %s signal, before %s finished its %s signal.", 287 Thread.currentThread(), signalName, currentlySignallingThread.get(), previousSignal)); 288 } 289 this.previousSignal = signalName; 290 } 291 292 public void leaveSignal(String signalName) { 293 currentlySignallingThread.set(null); 294 this.previousSignal = signalName; 295 } 296 297 private boolean isSynchronousSignal() { 298 return (previousSignal != null) && Thread.currentThread().equals(currentlySignallingThread.get()); 299 } 300 301 } 302 303 @Override 304 public void onSubscribe(Subscription s) { 305 final String signal = "onSubscribe()"; 306 concurrentAccessBarrier.enterSignal(signal); 307 308 subs = s; 309 subs.request(1); 310 311 concurrentAccessBarrier.leaveSignal(signal); 312 } 313 314 @Override 315 public void onNext(T ignore) { 316 final String signal = String.format("onNext(%s)", ignore); 317 concurrentAccessBarrier.enterSignal(signal); 318 319 if (gotElements.incrementAndGet() <= elements) // requesting one more than we know are in the stream (some Publishers need this) 320 subs.request(1); 321 322 concurrentAccessBarrier.leaveSignal(signal); 323 } 324 325 @Override 326 public void onError(Throwable t) { 327 final String signal = String.format("onError(%s)", t.getMessage()); 328 concurrentAccessBarrier.enterSignal(signal); 329 330 // ignore value 331 332 concurrentAccessBarrier.leaveSignal(signal); 333 } 334 335 @Override 336 public void onComplete() { 337 final String signal = "onComplete()"; 338 concurrentAccessBarrier.enterSignal(signal); 339 340 // entering for completeness 341 342 concurrentAccessBarrier.leaveSignal(signal); 343 completionLatch.close(); 344 } 345 }); 346 347 completionLatch.expectClose( 348 elements * env.defaultTimeoutMillis(), 349 String.format("Failed in iteration %d of %d. Expected completion signal after signalling %d elements (signalled %d), yet did not receive it", 350 runNumber, iterations, elements, gotElements.get())); 351 } 352 }); 353 return null; 354 } 355 }); 356 } 357 358 @Override @Test 359 public void optional_spec104_mustSignalOnErrorWhenFails() throws Throwable { 360 try { 361 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 362 @Override 363 public void run(final Publisher<T> pub) throws InterruptedException { 364 final Latch onErrorlatch = new Latch(env); 365 final Latch onSubscribeLatch = new Latch(env); 366 pub.subscribe(new TestEnvironment.TestSubscriber<T>(env) { 367 @Override 368 public void onSubscribe(Subscription subs) { 369 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 370 onSubscribeLatch.close(); 371 } 372 @Override 373 public void onError(Throwable cause) { 374 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 375 onErrorlatch.assertOpen(String.format("Error-state Publisher %s called `onError` twice on new Subscriber", pub)); 376 onErrorlatch.close(); 377 } 378 }); 379 380 onSubscribeLatch.expectClose("Should have received onSubscribe"); 381 onErrorlatch.expectClose(String.format("Error-state Publisher %s did not call `onError` on new Subscriber", pub)); 382 383 env.verifyNoAsyncErrors(); 384 } 385 }); 386 } catch (SkipException se) { 387 throw se; 388 } catch (Throwable ex) { 389 // we also want to catch AssertionErrors and anything the publisher may have thrown inside subscribe 390 // which was wrong of him - he should have signalled on error using onError 391 throw new RuntimeException(String.format("Publisher threw exception (%s) instead of signalling error via onError!", ex.getMessage()), ex); 392 } 393 } 394 395 @Override @Test 396 public void required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() throws Throwable { 397 activePublisherTest(3, true, new PublisherTestRun<T>() { 398 @Override 399 public void run(Publisher<T> pub) throws Throwable { 400 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 401 sub.requestNextElement(); 402 sub.requestNextElement(); 403 sub.requestNextElement(); 404 sub.requestEndOfStream(); 405 sub.expectNone(); 406 } 407 }); 408 } 409 410 @Override @Test 411 public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() throws Throwable { 412 optionalActivePublisherTest(0, true, new PublisherTestRun<T>() { 413 @Override 414 public void run(Publisher<T> pub) throws Throwable { 415 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 416 sub.request(1); 417 sub.expectCompletion(); 418 sub.expectNone(); 419 } 420 }); 421 } 422 423 @Override @Test 424 public void untested_spec106_mustConsiderSubscriptionCancelledAfterOnErrorOrOnCompleteHasBeenCalled() throws Throwable { 425 notVerified(); // not really testable without more control over the Publisher 426 } 427 428 @Override @Test 429 public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() throws Throwable { 430 activePublisherTest(1, true, new PublisherTestRun<T>() { 431 @Override 432 public void run(Publisher<T> pub) throws Throwable { 433 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 434 sub.request(10); 435 sub.nextElement(); 436 sub.expectCompletion(); 437 438 sub.request(10); 439 sub.expectNone(); 440 } 441 }); 442 } 443 444 @Override @Test 445 public void untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() throws Throwable { 446 notVerified(); // can we meaningfully test this, without more control over the publisher? 447 } 448 449 @Override @Test 450 public void untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals() throws Throwable { 451 notVerified(); // can we meaningfully test this? 452 } 453 454 @Override @Test 455 public void untested_spec109_subscribeShouldNotThrowNonFatalThrowable() throws Throwable { 456 notVerified(); // can we meaningfully test this? 457 } 458 459 @Override @Test 460 public void required_spec109_subscribeThrowNPEOnNullSubscriber() throws Throwable { 461 activePublisherTest(0, false, new PublisherTestRun<T>() { 462 @Override 463 public void run(Publisher<T> pub) throws Throwable { 464 try { 465 pub.subscribe(null); 466 env.flop("Publisher did not throw a NullPointerException when given a null Subscribe in subscribe"); 467 } catch (NullPointerException ignored) { 468 // valid behaviour 469 } 470 env.verifyNoAsyncErrorsNoDelay(); 471 } 472 }); 473 } 474 475 @Override @Test 476 public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws Throwable { 477 activePublisherTest(0, false, new PublisherTestRun<T>() { 478 @Override 479 public void run(Publisher<T> pub) throws Throwable { 480 final Latch onSubscribeLatch = new Latch(env); 481 final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>(); 482 try { 483 pub.subscribe(new Subscriber<T>() { 484 @Override 485 public void onError(Throwable cause) { 486 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 487 } 488 489 @Override 490 public void onSubscribe(Subscription subs) { 491 cancel.set(subs); 492 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 493 onSubscribeLatch.close(); 494 } 495 496 @Override 497 public void onNext(T elem) { 498 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always"); 499 } 500 501 @Override 502 public void onComplete() { 503 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always"); 504 } 505 }); 506 onSubscribeLatch.expectClose("Should have received onSubscribe"); 507 env.verifyNoAsyncErrorsNoDelay(); 508 } finally { 509 Subscription s = cancel.getAndSet(null); 510 if (s != null) { 511 s.cancel(); 512 } 513 } 514 } 515 }); 516 } 517 518 @Override @Test 519 public void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable { 520 whenHasErrorPublisherTest(new PublisherTestRun<T>() { 521 @Override 522 public void run(Publisher<T> pub) throws Throwable { 523 final Latch onErrorLatch = new Latch(env); 524 final Latch onSubscribeLatch = new Latch(env); 525 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 526 @Override 527 public void onError(Throwable cause) { 528 onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always"); 529 onErrorLatch.assertOpen("Only one onError call expected"); 530 onErrorLatch.close(); 531 } 532 533 @Override 534 public void onSubscribe(Subscription subs) { 535 onSubscribeLatch.assertOpen("Only one onSubscribe call expected"); 536 onSubscribeLatch.close(); 537 } 538 }; 539 pub.subscribe(sub); 540 onSubscribeLatch.expectClose("Should have received onSubscribe"); 541 onErrorLatch.expectClose("Should have received onError"); 542 543 env.verifyNoAsyncErrorsNoDelay(); 544 } 545 }); 546 } 547 548 @Override @Test 549 public void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable { 550 notVerified(); // can we meaningfully test this? 551 } 552 553 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 554 @Override @Test 555 public void optional_spec111_maySupportMultiSubscribe() throws Throwable { 556 optionalActivePublisherTest(1, false, new PublisherTestRun<T>() { 557 @Override 558 public void run(Publisher<T> pub) throws Throwable { 559 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 560 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 561 562 try { 563 env.verifyNoAsyncErrors(); 564 } finally { 565 try { 566 sub1.cancel(); 567 } finally { 568 sub2.cancel(); 569 } 570 } 571 } 572 }); 573 } 574 575 @Override @Test 576 public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable { 577 optionalActivePublisherTest(1, false, new PublisherTestRun<T>() { 578 @Override 579 public void run(Publisher<T> pub) throws Throwable { 580 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 581 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 582 // Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario, 583 // and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an 584 // onNext (and optional onComplete) signal(s) and which just onComplete signal. 585 // Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled 586 // with onNext. 587 sub1.requestNextElementOrEndOfStream(); 588 sub2.requestNextElementOrEndOfStream(); 589 try { 590 env.verifyNoAsyncErrors(); 591 } finally { 592 try { 593 sub1.cancel(); 594 } finally { 595 sub2.cancel(); 596 } 597 } 598 } 599 }); 600 } 601 602 @Override @Test 603 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { 604 optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 605 @Override 606 public void run(Publisher<T> pub) throws InterruptedException { 607 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 608 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 609 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 610 611 sub1.request(1); 612 T x1 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 613 sub2.request(2); 614 List<T> y1 = sub2.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 2nd subscriber", pub)); 615 sub1.request(1); 616 T x2 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 617 sub3.request(3); 618 List<T> z1 = sub3.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 3rd subscriber", pub)); 619 sub3.request(1); 620 T z2 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 621 sub3.request(1); 622 T z3 = sub3.nextElement(String.format("Publisher %s did not produce the requested 1 element on 3rd subscriber", pub)); 623 sub3.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 3rd subscriber", pub)); 624 sub2.request(3); 625 List<T> y2 = sub2.nextElements(3, String.format("Publisher %s did not produce the requested 3 elements on 2nd subscriber", pub)); 626 sub2.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 2nd subscriber", pub)); 627 sub1.request(2); 628 List<T> x3 = sub1.nextElements(2, String.format("Publisher %s did not produce the requested 2 elements on 1st subscriber", pub)); 629 sub1.request(1); 630 T x4 = sub1.nextElement(String.format("Publisher %s did not produce the requested 1 element on 1st subscriber", pub)); 631 sub1.requestEndOfStream(String.format("Publisher %s did not complete the stream as expected on 1st subscriber", pub)); 632 633 @SuppressWarnings("unchecked") 634 List<T> r = new ArrayList<T>(Arrays.asList(x1, x2)); 635 r.addAll(x3); 636 r.addAll(Collections.singleton(x4)); 637 638 List<T> check1 = new ArrayList<T>(y1); 639 check1.addAll(y2); 640 641 //noinspection unchecked 642 List<T> check2 = new ArrayList<T>(z1); 643 check2.add(z2); 644 check2.add(z3); 645 646 assertEquals(r, check1, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 2", pub)); 647 assertEquals(r, check2, String.format("Publisher %s did not produce the same element sequence for subscribers 1 and 3", pub)); 648 } 649 }); 650 } 651 652 @Override @Test 653 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { 654 optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements 655 @Override 656 public void run(Publisher<T> pub) throws Throwable { 657 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 658 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 659 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 660 661 List<T> received1 = new ArrayList<T>(); 662 List<T> received2 = new ArrayList<T>(); 663 List<T> received3 = new ArrayList<T>(); 664 665 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 666 // edgy edge case? 667 sub1.request(4); 668 sub2.request(4); 669 sub3.request(4); 670 671 received1.addAll(sub1.nextElements(3)); 672 received2.addAll(sub2.nextElements(3)); 673 received3.addAll(sub3.nextElements(3)); 674 675 // NOTE: can't check completion, the Publisher may not be able to signal it 676 // a similar test *with* completion checking is implemented 677 678 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 679 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 680 } 681 }); 682 } 683 684 @Override @Test 685 public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { 686 optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete) 687 @Override 688 public void run(Publisher<T> pub) throws Throwable { 689 ManualSubscriber<T> sub1 = env.newManualSubscriber(pub); 690 ManualSubscriber<T> sub2 = env.newManualSubscriber(pub); 691 ManualSubscriber<T> sub3 = env.newManualSubscriber(pub); 692 693 List<T> received1 = new ArrayList<T>(); 694 List<T> received2 = new ArrayList<T>(); 695 List<T> received3 = new ArrayList<T>(); 696 697 // if the publisher must touch it's source to notice it's been drained, the OnComplete won't come until we ask for more than it actually contains... 698 // edgy edge case? 699 sub1.request(4); 700 sub2.request(4); 701 sub3.request(4); 702 703 received1.addAll(sub1.nextElements(3)); 704 received2.addAll(sub2.nextElements(3)); 705 received3.addAll(sub3.nextElements(3)); 706 707 sub1.expectCompletion(); 708 sub2.expectCompletion(); 709 sub3.expectCompletion(); 710 711 assertEquals(received1, received2, String.format("Expected elements to be signaled in the same sequence to 1st and 2nd subscribers")); 712 assertEquals(received2, received3, String.format("Expected elements to be signaled in the same sequence to 2nd and 3rd subscribers")); 713 } 714 }); 715 } 716 717 ///////////////////// SUBSCRIPTION TESTS ////////////////////////////////// 718 719 @Override @Test 720 public void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable { 721 activePublisherTest(6, false, new PublisherTestRun<T>() { 722 @Override 723 public void run(Publisher<T> pub) throws Throwable { 724 ManualSubscriber<T> sub = new ManualSubscriber<T>(env) { 725 @Override 726 public void onSubscribe(Subscription subs) { 727 this.subscription.completeImmediatly(subs); 728 729 subs.request(1); 730 subs.request(1); 731 subs.request(1); 732 } 733 734 @Override 735 public void onNext(T element) { 736 Subscription subs = this.subscription.value(); 737 subs.request(1); 738 } 739 }; 740 741 env.subscribe(pub, sub); 742 743 env.verifyNoAsyncErrors(); 744 } 745 }); 746 } 747 748 @Override @Test 749 public void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable { 750 final long oneMoreThanBoundedLimit = boundedDepthOfOnNextAndRequestRecursion() + 1; 751 752 activePublisherTest(oneMoreThanBoundedLimit, false, new PublisherTestRun<T>() { 753 @Override 754 public void run(Publisher<T> pub) throws Throwable { 755 final ThreadLocal<Long> stackDepthCounter = new ThreadLocal<Long>() { 756 @Override 757 protected Long initialValue() { 758 return 0L; 759 } 760 }; 761 762 final Latch runCompleted = new Latch(env); 763 764 final ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 765 // counts the number of signals received, used to break out from possibly infinite request/onNext loops 766 long signalsReceived = 0L; 767 768 @Override 769 public void onNext(T element) { 770 // NOT calling super.onNext as this test only cares about stack depths, not the actual values of elements 771 // which also simplifies this test as we do not have to drain the test buffer, which would otherwise be in danger of overflowing 772 773 signalsReceived += 1; 774 stackDepthCounter.set(stackDepthCounter.get() + 1); 775 if (env.debugEnabled()) { 776 env.debug(String.format("%s(recursion depth: %d)::onNext(%s)", this, stackDepthCounter.get(), element)); 777 } 778 779 final long callsUntilNow = stackDepthCounter.get(); 780 if (callsUntilNow > boundedDepthOfOnNextAndRequestRecursion()) { 781 env.flop(String.format("Got %d onNext calls within thread: %s, yet expected recursive bound was %d", 782 callsUntilNow, Thread.currentThread(), boundedDepthOfOnNextAndRequestRecursion())); 783 784 // stop the recursive call chain 785 runCompleted.close(); 786 return; 787 } else if (signalsReceived >= oneMoreThanBoundedLimit) { 788 // since max number of signals reached, and recursion depth not exceeded, we judge this as a success and 789 // stop the recursive call chain 790 runCompleted.close(); 791 return; 792 } 793 794 // request more right away, the Publisher must break the recursion 795 subscription.value().request(1); 796 797 stackDepthCounter.set(stackDepthCounter.get() - 1); 798 } 799 800 @Override 801 public void onComplete() { 802 super.onComplete(); 803 runCompleted.close(); 804 } 805 806 @Override 807 public void onError(Throwable cause) { 808 super.onError(cause); 809 runCompleted.close(); 810 } 811 }; 812 813 try { 814 env.subscribe(pub, sub); 815 816 sub.request(1); // kick-off the `request -> onNext -> request -> onNext -> ...` 817 818 final String msg = String.format("Unable to validate call stack depth safety, " + 819 "awaited at-most %s signals (`maxOnNextSignalsInRecursionTest()`) or completion", 820 oneMoreThanBoundedLimit); 821 runCompleted.expectClose(env.defaultTimeoutMillis(), msg); 822 env.verifyNoAsyncErrorsNoDelay(); 823 } finally { 824 // since the request/onNext recursive calls may keep the publisher running "forever", 825 // we MUST cancel it manually before exiting this test case 826 sub.cancel(); 827 } 828 } 829 }); 830 } 831 832 @Override @Test 833 public void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception { 834 notVerified(); // cannot be meaningfully tested, or can it? 835 } 836 837 @Override @Test 838 public void untested_spec305_cancelMustNotSynchronouslyPerformHeavyComputation() throws Exception { 839 notVerified(); // cannot be meaningfully tested, or can it? 840 } 841 842 @Override @Test 843 public void required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() throws Throwable { 844 activePublisherTest(3, false, new PublisherTestRun<T>() { 845 @Override 846 public void run(Publisher<T> pub) throws Throwable { 847 848 // override ManualSubscriberWithSubscriptionSupport#cancel because by default a ManualSubscriber will drop the 849 // subscription once it's cancelled (as expected). 850 // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it. 851 ManualSubscriber<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(env) { 852 @Override 853 public void cancel() { 854 if (subscription.isCompleted()) { 855 subscription.value().cancel(); 856 } else { 857 env.flop("Cannot cancel a subscription before having received it"); 858 } 859 } 860 }; 861 862 env.subscribe(pub, sub); 863 864 sub.cancel(); 865 sub.request(1); 866 sub.request(1); 867 sub.request(1); 868 869 sub.expectNone(); 870 env.verifyNoAsyncErrorsNoDelay(); 871 } 872 }); 873 } 874 875 @Override @Test 876 public void required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() throws Throwable { 877 activePublisherTest(1, false, new PublisherTestRun<T>() { 878 @Override 879 public void run(Publisher<T> pub) throws Throwable { 880 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 881 882 // leak the Subscription 883 final Subscription subs = sub.subscription.value(); 884 885 subs.cancel(); 886 subs.cancel(); 887 subs.cancel(); 888 889 sub.expectNone(); 890 env.verifyNoAsyncErrorsNoDelay(); 891 } 892 }); 893 } 894 895 @Override @Test 896 public void required_spec309_requestZeroMustSignalIllegalArgumentException() throws Throwable { 897 activePublisherTest(10, false, new PublisherTestRun<T>() { 898 @Override public void run(Publisher<T> pub) throws Throwable { 899 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 900 sub.request(0); 901 sub.expectError(IllegalArgumentException.class); 902 } 903 }); 904 } 905 906 @Override @Test 907 public void required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() throws Throwable { 908 activePublisherTest(10, false, new PublisherTestRun<T>() { 909 @Override 910 public void run(Publisher<T> pub) throws Throwable { 911 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 912 final Random r = new Random(); 913 sub.request(-r.nextInt(Integer.MAX_VALUE) - 1); 914 // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem 915 sub.expectError(IllegalArgumentException.class); 916 } 917 }); 918 } 919 920 @Override @Test 921 public void optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() throws Throwable { 922 optionalActivePublisherTest(10, false, new PublisherTestRun<T>() { 923 @Override 924 public void run(Publisher<T> pub) throws Throwable { 925 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 926 final Random r = new Random(); 927 sub.request(-r.nextInt(Integer.MAX_VALUE) - 1); 928 // we do require implementations to mention the rule number at the very least, or mentioning that the non-negative request is the problem 929 sub.expectErrorWithMessage(IllegalArgumentException.class, Arrays.asList("3.9", "non-positive subscription request", "negative subscription request")); 930 } 931 }); 932 } 933 934 @Override @Test 935 public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable { 936 // the publisher is able to signal more elements than the subscriber will be requesting in total 937 final int publisherElements = 20; 938 939 final int demand1 = 10; 940 final int demand2 = 5; 941 final int totalDemand = demand1 + demand2; 942 943 activePublisherTest(publisherElements, false, new PublisherTestRun<T>() { 944 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 945 public void run(Publisher<T> pub) throws Throwable { 946 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 947 948 sub.request(demand1); 949 sub.request(demand2); 950 951 /* 952 NOTE: The order of the nextElement/cancel calls below is very important (!) 953 954 If this ordering was reversed, given an asynchronous publisher, 955 the following scenario would be *legal* and would break this test: 956 957 > AsyncPublisher receives request(10) - it does not emit data right away, it's asynchronous 958 > AsyncPublisher receives request(5) - demand is now 15 959 ! AsyncPublisher didn't emit any onNext yet (!) 960 > AsyncPublisher receives cancel() - handles it right away, by "stopping itself" for example 961 ! cancel was handled hefore the AsyncPublisher ever got the chance to emit data 962 ! the subscriber ends up never receiving even one element - the test is stuck (and fails, even on valid Publisher) 963 964 Which is why we must first expect an element, and then cancel, once the producing is "running". 965 */ 966 sub.nextElement(); 967 sub.cancel(); 968 969 int onNextsSignalled = 1; 970 971 boolean stillBeingSignalled; 972 do { 973 // put asyncError if onNext signal received 974 sub.expectNone(); 975 Throwable error = env.dropAsyncError(); 976 977 if (error == null) { 978 stillBeingSignalled = false; 979 } else { 980 onNextsSignalled += 1; 981 stillBeingSignalled = true; 982 } 983 984 // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw 985 assertTrue(onNextsSignalled <= totalDemand, 986 String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d", 987 onNextsSignalled, totalDemand)); 988 989 } while (stillBeingSignalled); 990 } 991 }); 992 993 env.verifyNoAsyncErrorsNoDelay(); 994 } 995 996 @Override @Test 997 public void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable { 998 final ReferenceQueue<ManualSubscriber<T>> queue = new ReferenceQueue<ManualSubscriber<T>>(); 999 1000 final Function<Publisher<T>, WeakReference<ManualSubscriber<T>>> run = new Function<Publisher<T>, WeakReference<ManualSubscriber<T>>>() { 1001 @Override 1002 public WeakReference<ManualSubscriber<T>> apply(Publisher<T> pub) throws Exception { 1003 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1004 final WeakReference<ManualSubscriber<T>> ref = new WeakReference<ManualSubscriber<T>>(sub, queue); 1005 1006 sub.request(1); 1007 sub.nextElement(); 1008 sub.cancel(); 1009 1010 return ref; 1011 } 1012 }; 1013 1014 activePublisherTest(3, false, new PublisherTestRun<T>() { 1015 @Override 1016 public void run(Publisher<T> pub) throws Throwable { 1017 final WeakReference<ManualSubscriber<T>> ref = run.apply(pub); 1018 1019 // cancel may be run asynchronously so we add a sleep before running the GC 1020 // to "resolve" the race 1021 Thread.sleep(publisherReferenceGCTimeoutMillis); 1022 System.gc(); 1023 1024 if (!ref.equals(queue.remove(100))) { 1025 env.flop(String.format("Publisher %s did not drop reference to test subscriber after subscription cancellation", pub)); 1026 } 1027 1028 env.verifyNoAsyncErrorsNoDelay(); 1029 } 1030 }); 1031 } 1032 1033 @Override @Test 1034 public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable { 1035 final int totalElements = 3; 1036 1037 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 1038 @Override 1039 public void run(Publisher<T> pub) throws Throwable { 1040 ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1041 sub.request(Long.MAX_VALUE); 1042 1043 sub.nextElements(totalElements); 1044 sub.expectCompletion(); 1045 1046 env.verifyNoAsyncErrorsNoDelay(); 1047 } 1048 }); 1049 } 1050 1051 @Override @Test 1052 public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable { 1053 final int totalElements = 3; 1054 1055 activePublisherTest(totalElements, true, new PublisherTestRun<T>() { 1056 @Override 1057 public void run(Publisher<T> pub) throws Throwable { 1058 final ManualSubscriber<T> sub = env.newManualSubscriber(pub); 1059 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2 1060 sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1 1061 sub.request(1); // pending = Long.MAX_VALUE 1062 1063 sub.nextElements(totalElements); 1064 sub.expectCompletion(); 1065 1066 try { 1067 env.verifyNoAsyncErrorsNoDelay(); 1068 } finally { 1069 sub.cancel(); 1070 } 1071 1072 } 1073 }); 1074 } 1075 1076 @Override @Test 1077 public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable { 1078 activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() { 1079 @Override public void run(Publisher<T> pub) throws Throwable { 1080 final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) { 1081 // arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls, 1082 // so 10 is relatively high and safe even if arbitrarily chosen 1083 int callsCounter = 10; 1084 1085 @Override 1086 public void onNext(T element) { 1087 if (env.debugEnabled()) { 1088 env.debug(String.format("%s::onNext(%s)", this, element)); 1089 } 1090 if (subscription.isCompleted()) { 1091 if (callsCounter > 0) { 1092 subscription.value().request(Long.MAX_VALUE - 1); 1093 callsCounter--; 1094 } else { 1095 subscription.value().cancel(); 1096 } 1097 } else { 1098 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 1099 } 1100 } 1101 }; 1102 env.subscribe(pub, sub, env.defaultTimeoutMillis()); 1103 1104 // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)` 1105 // we're pretty sure to overflow from those 1106 sub.request(1); 1107 1108 // no onError should be signalled 1109 try { 1110 env.verifyNoAsyncErrors(); 1111 } finally { 1112 sub.cancel(); 1113 } 1114 } 1115 }); 1116 } 1117 1118 ///////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 1119 1120 ///////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 1121 1122 public interface PublisherTestRun<T> { 1123 public void run(Publisher<T> pub) throws Throwable; 1124 } 1125 1126 /** 1127 * Test for feature that SHOULD/MUST be implemented, using a live publisher. 1128 * 1129 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1130 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1131 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1132 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1133 */ 1134 public void activePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1135 if (elements > maxElementsFromPublisher()) { 1136 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1137 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1138 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1139 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1140 } else { 1141 Publisher<T> pub = createPublisher(elements); 1142 body.run(pub); 1143 env.verifyNoAsyncErrorsNoDelay(); 1144 } 1145 } 1146 1147 /** 1148 * Test for feature that MAY be implemented. This test will be marked as SKIPPED if it fails. 1149 * 1150 * @param elements the number of elements the Publisher under test must be able to emit to run this test 1151 * @param completionSignalRequired true if an {@code onComplete} signal is required by this test to run. 1152 * If the tested Publisher is unable to signal completion, tests requireing onComplete signals will be skipped. 1153 * To signal if your Publisher is able to signal completion see {@link PublisherVerification#maxElementsFromPublisher()}. 1154 */ 1155 public void optionalActivePublisherTest(long elements, boolean completionSignalRequired, PublisherTestRun<T> body) throws Throwable { 1156 if (elements > maxElementsFromPublisher()) { 1157 throw new SkipException(String.format("Unable to run this test, as required elements nr: %d is higher than supported by given producer: %d", elements, maxElementsFromPublisher())); 1158 } else if (completionSignalRequired && maxElementsFromPublisher() == Long.MAX_VALUE) { 1159 throw new SkipException("Unable to run this test, as it requires an onComplete signal, " + 1160 "which this Publisher is unable to provide (as signalled by returning Long.MAX_VALUE from `maxElementsFromPublisher()`)"); 1161 } else { 1162 1163 final Publisher<T> pub = createPublisher(elements); 1164 final String skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement."; 1165 1166 try { 1167 potentiallyPendingTest(pub, body); 1168 } catch (Exception ex) { 1169 notVerified(skipMessage); 1170 } catch (AssertionError ex) { 1171 notVerified(skipMessage + " Reason for skipping was: " + ex.getMessage()); 1172 } 1173 } 1174 } 1175 1176 public static final String SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE = 1177 "Skipping because no error state Publisher provided, and the test requires it. " + 1178 "Please implement PublisherVerification#createFailedPublisher to run this test."; 1179 1180 public static final String SKIPPING_OPTIONAL_TEST_FAILED = 1181 "Skipping, because provided Publisher does not pass this *additional* verification."; 1182 /** 1183 * Additional test for Publisher in error state 1184 */ 1185 public void whenHasErrorPublisherTest(PublisherTestRun<T> body) throws Throwable { 1186 potentiallyPendingTest(createFailedPublisher(), body, SKIPPING_NO_ERROR_PUBLISHER_AVAILABLE); 1187 } 1188 1189 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body) throws Throwable { 1190 potentiallyPendingTest(pub, body, SKIPPING_OPTIONAL_TEST_FAILED); 1191 } 1192 1193 public void potentiallyPendingTest(Publisher<T> pub, PublisherTestRun<T> body, String message) throws Throwable { 1194 if (pub != null) { 1195 body.run(pub); 1196 } else { 1197 throw new SkipException(message); 1198 } 1199 } 1200 1201 /** 1202 * Executes a given test body {@code n} times. 1203 * All the test runs must pass in order for the stochastic test to pass. 1204 */ 1205 public void stochasticTest(int n, Function<Integer, Void> body) throws Throwable { 1206 if (skipStochasticTests()) { 1207 notVerified("Skipping @Stochastic test because `skipStochasticTests()` returned `true`!"); 1208 } 1209 1210 for (int i = 0; i < n; i++) { 1211 body.apply(i); 1212 } 1213 } 1214 1215 public void notVerified() { 1216 throw new SkipException("Not verified by this TCK."); 1217 } 1218 1219 /** 1220 * Return this value from {@link PublisherVerification#maxElementsFromPublisher()} to mark that the given {@link org.reactivestreams.Publisher}, 1221 * is not able to signal completion. For example it is strictly a time-bound or unbounded source of data. 1222 * 1223 * <b>Returning this value from {@link PublisherVerification#maxElementsFromPublisher()} will result in skipping all TCK tests which require onComplete signals!</b> 1224 */ 1225 public long publisherUnableToSignalOnComplete() { 1226 return Long.MAX_VALUE; 1227 } 1228 1229 public void notVerified(String message) { 1230 throw new SkipException(message); 1231 } 1232 1233}