001package org.reactivestreams.tck; 002 003import org.reactivestreams.Publisher; 004import org.reactivestreams.Subscriber; 005import org.reactivestreams.Subscription; 006import org.reactivestreams.tck.TestEnvironment.ManualPublisher; 007import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; 008import org.reactivestreams.tck.support.Optional; 009import org.reactivestreams.tck.support.SubscriberBlackboxVerificationRules; 010import org.reactivestreams.tck.support.TestException; 011import org.testng.SkipException; 012import org.testng.annotations.AfterClass; 013import org.testng.annotations.BeforeClass; 014import org.testng.annotations.BeforeMethod; 015import org.testng.annotations.Test; 016 017import java.util.concurrent.ExecutorService; 018import java.util.concurrent.Executors; 019 020import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; 021import static org.testng.Assert.assertTrue; 022 023/** 024 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} 025 * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing). 026 * 027 * This verification is NOT able to check many of the rules of the spec, and if you want more 028 * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification} 029 * instead. 030 * 031 * @see org.reactivestreams.Subscriber 032 * @see org.reactivestreams.Subscription 033 */ 034public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T> 035 implements SubscriberBlackboxVerificationRules { 036 037 protected final TestEnvironment env; 038 039 protected SubscriberBlackboxVerification(TestEnvironment env) { 040 this.env = env; 041 } 042 043 // USER API 044 045 /** 046 * This is the main method you must implement in your test incarnation. 047 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. 048 */ 049 public abstract Subscriber<T> createSubscriber(); 050 051 /** 052 * Override this method if the Subscriber implementation you are verifying 053 * needs an external signal before it signals demand to its Publisher. 054 * 055 * By default this method does nothing. 056 */ 057 public void triggerRequest(final Subscriber<? super T> subscriber) { 058 059 } 060 061 // ENV SETUP 062 063 /** 064 * Executor service used by the default provided asynchronous Publisher. 065 * @see #createHelperPublisher(long) 066 */ 067 private ExecutorService publisherExecutor; 068 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } 069 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } 070 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } 071 072 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// 073 074 @BeforeMethod 075 public void setUp() throws Exception { 076 env.clearAsyncErrors(); 077 } 078 079 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// 080 081 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.1 082 @Override @Test 083 public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable { 084 blackboxSubscriberTest(new BlackboxTestStageTestRun() { 085 @Override 086 public void run(BlackboxTestStage stage) throws InterruptedException { 087 triggerRequest(stage.subProxy().sub()); 088 final long n = stage.expectRequest();// assuming subscriber wants to consume elements... 089 090 // should cope with up to requested number of elements 091 for (int i = 0; i < n; i++) 092 stage.signalNext(); 093 } 094 }); 095 } 096 097 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.2 098 @Override @Test 099 public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception { 100 notVerified(); // cannot be meaningfully tested, or can it? 101 } 102 103 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 104 @Override @Test 105 public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { 106 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 107 @Override 108 public void run(BlackboxTestStage stage) throws Throwable { 109 final Subscription subs = new Subscription() { 110 @Override 111 public void request(long n) { 112 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete"); 113 if (onCompleteStackTraceElement.isDefined()) { 114 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); 115 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 116 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 117 } 118 } 119 120 @Override 121 public void cancel() { 122 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete"); 123 if (onCompleteStackElement.isDefined()) { 124 final StackTraceElement stackElem = onCompleteStackElement.get(); 125 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", 126 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 127 } 128 } 129 }; 130 131 final Subscriber<T> sub = createSubscriber(); 132 sub.onSubscribe(subs); 133 sub.onComplete(); 134 135 env.verifyNoAsyncErrorsNoDelay(); 136 } 137 }); 138 } 139 140 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.3 141 @Override @Test 142 public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { 143 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 144 @Override 145 public void run(BlackboxTestStage stage) throws Throwable { 146 final Subscription subs = new Subscription() { 147 @Override 148 public void request(long n) { 149 Throwable thr = new Throwable(); 150 for (StackTraceElement stackElem : thr.getStackTrace()) { 151 if (stackElem.getMethodName().equals("onError")) { 152 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 153 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 154 } 155 } 156 } 157 158 @Override 159 public void cancel() { 160 Throwable thr = new Throwable(); 161 for (StackTraceElement stackElem : thr.getStackTrace()) { 162 if (stackElem.getMethodName().equals("onError")) { 163 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", 164 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); 165 } 166 } 167 } 168 }; 169 170 final Subscriber<T> sub = createSubscriber(); 171 sub.onSubscribe(subs); 172 sub.onError(new TestException()); 173 174 env.verifyNoAsyncErrorsNoDelay(); 175 } 176 }); 177 } 178 179 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.4 180 @Override @Test 181 public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { 182 notVerified(); // cannot be meaningfully tested, or can it? 183 } 184 185 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.5 186 @Override @Test 187 public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception { 188 new BlackboxTestStage(env) {{ 189 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail 190 final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env); 191 sub().onSubscribe( 192 new Subscription() { 193 @Override 194 public void request(long elements) { 195 env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements)); 196 } 197 198 @Override 199 public void cancel() { 200 secondSubscriptionCancelled.close(); 201 } 202 203 @Override 204 public String toString() { 205 return "SecondSubscription(should get cancelled)"; 206 } 207 }); 208 209 secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called."); 210 env.verifyNoAsyncErrorsNoDelay(); 211 }}; 212 } 213 214 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.6 215 @Override @Test 216 public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { 217 notVerified(); // cannot be meaningfully tested, or can it? 218 } 219 220 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.7 221 @Override @Test 222 public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { 223 notVerified(); // cannot be meaningfully tested, or can it? 224 // the same thread part of the clause can be verified but that is not very useful, or is it? 225 } 226 227 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.8 228 @Override @Test 229 public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { 230 notVerified(); // cannot be meaningfully tested as black box, or can it? 231 } 232 233 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 234 @Override @Test 235 public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { 236 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 237 @Override 238 public void run(BlackboxTestStage stage) throws Throwable { 239 final Publisher<T> pub = new Publisher<T>() { 240 @Override public void subscribe(final Subscriber<? super T> s) { 241 s.onSubscribe(new Subscription() { 242 private boolean completed = false; 243 244 @Override public void request(long n) { 245 if (!completed) { 246 completed = true; 247 s.onComplete(); // Publisher now realises that it is in fact already completed 248 } 249 } 250 251 @Override public void cancel() { 252 // noop, ignore 253 } 254 }); 255 } 256 }; 257 258 final Subscriber<T> sub = createSubscriber(); 259 final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub); 260 261 pub.subscribe(probe); 262 triggerRequest(sub); 263 probe.expectCompletion(); 264 probe.expectNone(); 265 266 env.verifyNoAsyncErrorsNoDelay(); 267 } 268 }); 269 } 270 271 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.9 272 @Override @Test 273 public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { 274 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 275 @Override 276 public void run(BlackboxTestStage stage) throws Throwable { 277 final Publisher<T> pub = new Publisher<T>() { 278 @Override 279 public void subscribe(Subscriber<? super T> s) { 280 s.onComplete(); 281 } 282 }; 283 284 final Subscriber<T> sub = createSubscriber(); 285 final BlackboxSubscriberProxy<T> probe = stage.createBlackboxSubscriberProxy(env, sub); 286 287 pub.subscribe(probe); 288 probe.expectCompletion(); 289 290 env.verifyNoAsyncErrorsNoDelay(); 291 } 292 }); 293 } 294 295 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 296 @Override @Test 297 public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { 298 blackboxSubscriberTest(new BlackboxTestStageTestRun() { 299 @Override 300 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 301 public void run(BlackboxTestStage stage) throws Throwable { 302 stage.sub().onError(new TestException()); 303 stage.subProxy().expectError(Throwable.class); 304 } 305 }); 306 } 307 308 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.11 309 @Override @Test 310 public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { 311 notVerified(); // cannot be meaningfully tested, or can it? 312 } 313 314 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.12 315 @Override @Test 316 public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable { 317 notVerified(); // cannot be meaningfully tested as black box, or can it? 318 } 319 320 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 321 @Override @Test 322 public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception { 323 notVerified(); // cannot be meaningfully tested, or can it? 324 } 325 326 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 327 @Override @Test 328 public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 329 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 330 @Override 331 public void run(BlackboxTestStage stage) throws Throwable { 332 333 { 334 final Subscriber<T> sub = createSubscriber(); 335 boolean gotNPE = false; 336 try { 337 sub.onSubscribe(null); 338 } catch(final NullPointerException expected) { 339 gotNPE = true; 340 } 341 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); 342 } 343 344 env.verifyNoAsyncErrorsNoDelay(); 345 } 346 }); 347 } 348 349 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 350 @Override @Test 351 public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 352 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 353 @Override 354 public void run(BlackboxTestStage stage) throws Throwable { 355 final Subscription subscription = new Subscription() { 356 @Override public void request(final long elements) {} 357 @Override public void cancel() {} 358 }; 359 360 { 361 final Subscriber<T> sub = createSubscriber(); 362 boolean gotNPE = false; 363 sub.onSubscribe(subscription); 364 try { 365 sub.onNext(null); 366 } catch(final NullPointerException expected) { 367 gotNPE = true; 368 } 369 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); 370 } 371 372 env.verifyNoAsyncErrorsNoDelay(); 373 } 374 }); 375 } 376 377 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.13 378 @Override @Test 379 public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { 380 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { 381 @Override 382 public void run(BlackboxTestStage stage) throws Throwable { 383 final Subscription subscription = new Subscription() { 384 @Override public void request(final long elements) {} 385 @Override public void cancel() {} 386 }; 387 388 { 389 final Subscriber<T> sub = createSubscriber(); 390 boolean gotNPE = false; 391 sub.onSubscribe(subscription); 392 try { 393 sub.onError(null); 394 } catch(final NullPointerException expected) { 395 gotNPE = true; 396 } 397 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); 398 } 399 400 env.verifyNoAsyncErrorsNoDelay(); 401 } 402 }); 403 } 404 405 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// 406 407 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.1 408 @Override @Test 409 public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception { 410 notVerified(); // cannot be meaningfully tested, or can it? 411 } 412 413 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.8 414 @Override @Test 415 public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { 416 notVerified(); // cannot be meaningfully tested as black box, or can it? 417 } 418 419 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.10 420 @Override @Test 421 public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { 422 notVerified(); // cannot be meaningfully tested, or can it? 423 } 424 425 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.11 426 @Override @Test 427 public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { 428 notVerified(); // cannot be meaningfully tested, or can it? 429 } 430 431 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.14 432 @Override @Test 433 public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { 434 notVerified(); // cannot be meaningfully tested, or can it? 435 } 436 437 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.15 438 @Override @Test 439 public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { 440 notVerified(); // cannot be meaningfully tested, or can it? 441 } 442 443 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.16 444 @Override @Test 445 public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { 446 notVerified(); // cannot be meaningfully tested, or can it? 447 } 448 449 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// 450 451 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// 452 453 abstract class BlackboxTestStageTestRun { 454 public abstract void run(BlackboxTestStage stage) throws Throwable; 455 } 456 457 public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable { 458 BlackboxTestStage stage = new BlackboxTestStage(env, true); 459 body.run(stage); 460 } 461 462 public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable { 463 BlackboxTestStage stage = new BlackboxTestStage(env, false); 464 body.run(stage); 465 } 466 467 public class BlackboxTestStage extends ManualPublisher<T> { 468 public Publisher<T> pub; 469 public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values 470 471 public T lastT = null; 472 private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty(); 473 474 public BlackboxTestStage(TestEnvironment env) throws InterruptedException { 475 this(env, true); 476 } 477 478 public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { 479 super(env); 480 if (runDefaultInit) { 481 pub = this.createHelperPublisher(Long.MAX_VALUE); 482 tees = env.newManualSubscriber(pub); 483 Subscriber<T> sub = createSubscriber(); 484 subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub)); 485 subscribe(subProxy.get()); 486 } 487 } 488 489 public Subscriber<? super T> sub() { 490 return subscriber.value(); 491 } 492 493 /** 494 * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber. 495 */ 496 public BlackboxSubscriberProxy<T> subProxy() { 497 return subProxy.get(); 498 } 499 500 public Publisher<T> createHelperPublisher(long elements) { 501 return SubscriberBlackboxVerification.this.createHelperPublisher(elements); 502 } 503 504 public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) { 505 return new BlackboxSubscriberProxy<T>(env, sub); 506 } 507 508 public T signalNext() throws InterruptedException { 509 T element = nextT(); 510 sendNext(element); 511 return element; 512 } 513 514 public T nextT() throws InterruptedException { 515 lastT = tees.requestNextElement(); 516 return lastT; 517 } 518 519 } 520 521 public void notVerified() { 522 throw new SkipException("Not verified using this TCK."); 523 } 524}