001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.support.SubscriberBufferOverflowException; 018import org.reactivestreams.tck.support.Optional; 019 020import java.util.Collections; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.ArrayBlockingQueue; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.TimeUnit; 027 028import static org.testng.Assert.assertTrue; 029import static org.testng.Assert.fail; 030 031public class TestEnvironment { 032 public static final int TEST_BUFFER_SIZE = 16; 033 034 private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS"; 035 private static final long DEFAULT_TIMEOUT_MILLIS = 100; 036 037 private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS"; 038 039 private final long defaultTimeoutMillis; 040 private final long defaultNoSignalsTimeoutMillis; 041 private final boolean printlnDebug; 042 043 private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>(); 044 045 /** 046 * Tests must specify the timeout for expected outcome of asynchronous 047 * interactions. Longer timeout does not invalidate the correctness of 048 * the implementation, but can in some cases result in longer time to 049 * run the tests. 050 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 051 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 052 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 053 */ 054 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) { 055 this.defaultTimeoutMillis = defaultTimeoutMillis; 056 this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis; 057 this.printlnDebug = printlnDebug; 058 } 059 060 /** 061 * Tests must specify the timeout for expected outcome of asynchronous 062 * interactions. Longer timeout does not invalidate the correctness of 063 * the implementation, but can in some cases result in longer time to 064 * run the tests. 065 * 066 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 067 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 068 */ 069 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) { 070 this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false); 071 } 072 073 /** 074 * Tests must specify the timeout for expected outcome of asynchronous 075 * interactions. Longer timeout does not invalidate the correctness of 076 * the implementation, but can in some cases result in longer time to 077 * run the tests. 078 * 079 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 080 */ 081 public TestEnvironment(long defaultTimeoutMillis) { 082 this(defaultTimeoutMillis, defaultTimeoutMillis, false); 083 } 084 085 /** 086 * Tests must specify the timeout for expected outcome of asynchronous 087 * interactions. Longer timeout does not invalidate the correctness of 088 * the implementation, but can in some cases result in longer time to 089 * run the tests. 090 * 091 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 092 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 093 * 094 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 095 * often helpful to pinpoint simple race conditions etc. 096 */ 097 public TestEnvironment(boolean printlnDebug) { 098 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), printlnDebug); 099 } 100 101 /** 102 * Tests must specify the timeout for expected outcome of asynchronous 103 * interactions. Longer timeout does not invalidate the correctness of 104 * the implementation, but can in some cases result in longer time to 105 * run the tests. 106 * 107 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 108 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 109 */ 110 public TestEnvironment() { 111 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis()); 112 } 113 114 /** This timeout is used when waiting for a signal to arrive. */ 115 public long defaultTimeoutMillis() { 116 return defaultTimeoutMillis; 117 } 118 119 /** 120 * This timeout is used when asserting that no further signals are emitted. 121 * Note that this timeout default 122 */ 123 public long defaultNoSignalsTimeoutMillis() { 124 return defaultNoSignalsTimeoutMillis; 125 } 126 127 /** 128 * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 129 * 130 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 131 */ 132 public static long envDefaultTimeoutMillis() { 133 final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV); 134 if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS; 135 else try { 136 return Long.parseLong(envMillis); 137 } catch (NumberFormatException ex) { 138 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex); 139 } 140 } 141 142 /** 143 * Tries to parse the env variable {@code DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 144 * 145 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 146 */ 147 public static long envDefaultNoSignalsTimeoutMillis() { 148 final String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV); 149 if (envMillis == null) return envDefaultTimeoutMillis(); 150 else try { 151 return Long.parseLong(envMillis); 152 } catch (NumberFormatException ex) { 153 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex); 154 } 155 } 156 157 /** 158 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 159 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 160 * 161 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 162 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 163 * from the environment using {@code env.dropAsyncError()}. 164 * 165 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 166 */ 167 public void flop(String msg) { 168 try { 169 fail(msg); 170 } catch (Throwable t) { 171 asyncErrors.add(t); 172 } 173 } 174 175 /** 176 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 177 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 178 * 179 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 180 * 181 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 182 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 183 * from the environment using {@code env.dropAsyncError()}. 184 * 185 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 186 */ 187 public void flop(Throwable thr, String msg) { 188 try { 189 fail(msg, thr); 190 } catch (Throwable t) { 191 asyncErrors.add(thr); 192 } 193 } 194 195 /** 196 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 197 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 198 * 199 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 200 * 201 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 202 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 203 * from the environment using {@code env.dropAsyncError()}. 204 * 205 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 206 */ 207 public void flop(Throwable thr) { 208 try { 209 fail(thr.getMessage(), thr); 210 } catch (Throwable t) { 211 asyncErrors.add(thr); 212 } 213 } 214 215 /** 216 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 217 * 218 * This method DOES fail the test right away (it tries to, by throwing an AssertionException), 219 * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error. 220 * 221 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 222 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 223 * from the environment using {@code env.dropAsyncError()}. 224 * 225 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 226 */ 227 public <T> T flopAndFail(String msg) { 228 try { 229 fail(msg); 230 } catch (Throwable t) { 231 asyncErrors.add(t); 232 fail(msg, t); 233 } 234 return null; // unreachable, the previous block will always exit by throwing 235 } 236 237 238 239 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException { 240 subscribe(pub, sub, defaultTimeoutMillis); 241 } 242 243 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException { 244 pub.subscribe(sub); 245 sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); 246 verifyNoAsyncErrorsNoDelay(); 247 } 248 249 public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException { 250 ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this); 251 subscribe(pub, sub, defaultTimeoutMillis()); 252 return sub; 253 } 254 255 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException { 256 return newManualSubscriber(pub, defaultTimeoutMillis()); 257 } 258 259 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException { 260 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this); 261 subscribe(pub, sub, timeoutMillis); 262 return sub; 263 } 264 265 public void clearAsyncErrors() { 266 asyncErrors.clear(); 267 } 268 269 public Throwable dropAsyncError() { 270 try { 271 return asyncErrors.remove(0); 272 } catch (IndexOutOfBoundsException ex) { 273 return null; 274 } 275 } 276 277 /** 278 * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors 279 * were signalled pior to, or during that time (by calling {@code flop()}). 280 */ 281 public void verifyNoAsyncErrors() { 282 verifyNoAsyncErrors(defaultNoSignalsTimeoutMillis()); 283 } 284 285 /** 286 * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled 287 * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time. 288 * <p></p> 289 * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)}, 290 * and if no errors encountered wait for another default timeout as the errors may yet be signalled. 291 * The initial check is performed in order to fail-fast in case of an already failed test. 292 */ 293 public void verifyNoAsyncErrors(long delay) { 294 try { 295 verifyNoAsyncErrorsNoDelay(); 296 297 Thread.sleep(delay); 298 verifyNoAsyncErrorsNoDelay(); 299 } catch (InterruptedException e) { 300 throw new RuntimeException(e); 301 } 302 } 303 304 /** 305 * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}). 306 * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used 307 * for example in tight loops etc. 308 */ 309 public void verifyNoAsyncErrorsNoDelay() { 310 for (Throwable e : asyncErrors) { 311 if (e instanceof AssertionError) { 312 throw (AssertionError) e; 313 } else { 314 fail(String.format("Async error during test execution: %s", e.getMessage()), e); 315 } 316 } 317 } 318 319 /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */ 320 public void debug(String msg) { 321 if (printlnDebug) 322 System.out.printf("[TCK-DEBUG] %s%n", msg); 323 } 324 325 /** 326 * Looks for given {@code method} method in stack trace. 327 * Can be used to answer questions like "was this method called from onComplete?". 328 * 329 * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise 330 */ 331 public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) { 332 final Throwable thr = new Throwable(); // gets the stacktrace 333 334 for (StackTraceElement stackElement : thr.getStackTrace()) { 335 if (stackElement.getMethodName().equals(method)) { 336 return Optional.of(stackElement); 337 } 338 } 339 return Optional.empty(); 340 } 341 342 // ---- classes ---- 343 344 /** 345 * {@link Subscriber} implementation which can be steered by test code and asserted on. 346 */ 347 public static class ManualSubscriber<T> extends TestSubscriber<T> { 348 Receptacle<T> received; 349 350 public ManualSubscriber(TestEnvironment env) { 351 super(env); 352 received = new Receptacle<T>(this.env); 353 } 354 355 @Override 356 public void onNext(T element) { 357 try { 358 received.add(element); 359 } catch (IllegalStateException ex) { 360 // error message refinement 361 throw new SubscriberBufferOverflowException( 362 String.format("Received more than bufferSize (%d) onNext signals. " + 363 "The Publisher probably emited more signals than expected!", 364 received.QUEUE_SIZE), ex); 365 } 366 } 367 368 @Override 369 public void onComplete() { 370 received.complete(); 371 } 372 373 public void request(long elements) { 374 subscription.value().request(elements); 375 } 376 377 public T requestNextElement() throws InterruptedException { 378 return requestNextElement(env.defaultTimeoutMillis()); 379 } 380 381 public T requestNextElement(long timeoutMillis) throws InterruptedException { 382 return requestNextElement(timeoutMillis, "Did not receive expected element"); 383 } 384 385 public T requestNextElement(String errorMsg) throws InterruptedException { 386 return requestNextElement(env.defaultTimeoutMillis(), errorMsg); 387 } 388 389 public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 390 request(1); 391 return nextElement(timeoutMillis, errorMsg); 392 } 393 394 public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException { 395 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 396 } 397 398 public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException { 399 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg); 400 } 401 402 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 403 return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 404 } 405 406 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 407 request(1); 408 return nextElementOrEndOfStream(timeoutMillis, errorMsg); 409 } 410 411 public void requestEndOfStream() throws InterruptedException { 412 requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 413 } 414 415 public void requestEndOfStream(long timeoutMillis) throws InterruptedException { 416 requestEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 417 } 418 419 public void requestEndOfStream(String errorMsg) throws InterruptedException { 420 requestEndOfStream(env.defaultTimeoutMillis(), errorMsg); 421 } 422 423 public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 424 request(1); 425 expectCompletion(timeoutMillis, errorMsg); 426 } 427 428 public List<T> requestNextElements(long elements) throws InterruptedException { 429 request(elements); 430 return nextElements(elements, env.defaultTimeoutMillis()); 431 } 432 433 public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException { 434 request(elements); 435 return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements)); 436 } 437 438 public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 439 request(elements); 440 return nextElements(elements, timeoutMillis, errorMsg); 441 } 442 443 public T nextElement() throws InterruptedException { 444 return nextElement(env.defaultTimeoutMillis()); 445 } 446 447 public T nextElement(long timeoutMillis) throws InterruptedException { 448 return nextElement(timeoutMillis, "Did not receive expected element"); 449 } 450 451 public T nextElement(String errorMsg) throws InterruptedException { 452 return nextElement(env.defaultTimeoutMillis(), errorMsg); 453 } 454 455 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 456 return received.next(timeoutMillis, errorMsg); 457 } 458 459 public Optional<T> nextElementOrEndOfStream() throws InterruptedException { 460 return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 461 } 462 463 public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 464 return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 465 } 466 467 public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 468 return received.nextOrEndOfStream(timeoutMillis, errorMsg); 469 } 470 471 public List<T> nextElements(long elements) throws InterruptedException { 472 return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion"); 473 } 474 475 public List<T> nextElements(long elements, String errorMsg) throws InterruptedException { 476 return nextElements(elements, env.defaultTimeoutMillis(), errorMsg); 477 } 478 479 public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException { 480 return nextElements(elements, timeoutMillis, "Did not receive expected element or completion"); 481 } 482 483 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 484 return received.nextN(elements, timeoutMillis, errorMsg); 485 } 486 487 public void expectNext(T expected) throws InterruptedException { 488 expectNext(expected, env.defaultTimeoutMillis()); 489 } 490 491 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 492 T received = nextElement(timeoutMillis, "Did not receive expected element on downstream"); 493 if (!received.equals(expected)) { 494 env.flop(String.format("Expected element %s on downstream but received %s", expected, received)); 495 } 496 } 497 498 public void expectCompletion() throws InterruptedException { 499 expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 500 } 501 502 public void expectCompletion(long timeoutMillis) throws InterruptedException { 503 expectCompletion(timeoutMillis, "Did not receive expected stream completion"); 504 } 505 506 public void expectCompletion(String errorMsg) throws InterruptedException { 507 expectCompletion(env.defaultTimeoutMillis(), errorMsg); 508 } 509 510 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 511 received.expectCompletion(timeoutMillis, errorMsg); 512 } 513 514 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception { 515 expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis()); 516 } 517 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception { 518 expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis()); 519 } 520 521 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 522 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception { 523 expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis); 524 } 525 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception { 526 final E err = expectError(expected, timeoutMillis); 527 final String message = err.getMessage(); 528 529 boolean contains = false; 530 for (String requiredMessagePart : requiredMessagePartAlternatives) 531 if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to 532 assertTrue(contains, 533 String.format("Got expected exception [%s] but missing message part [%s], was: %s", 534 err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage())); 535 } 536 537 public <E extends Throwable> E expectError(Class<E> expected) throws Exception { 538 return expectError(expected, env.defaultTimeoutMillis()); 539 } 540 541 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception { 542 return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName())); 543 } 544 545 public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception { 546 return expectError(expected, env.defaultTimeoutMillis(), errorMsg); 547 } 548 549 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception { 550 return received.expectError(expected, timeoutMillis, errorMsg); 551 } 552 553 public void expectNone() throws InterruptedException { 554 expectNone(env.defaultNoSignalsTimeoutMillis()); 555 } 556 557 public void expectNone(String errMsgPrefix) throws InterruptedException { 558 expectNone(env.defaultNoSignalsTimeoutMillis(), errMsgPrefix); 559 } 560 561 public void expectNone(long withinMillis) throws InterruptedException { 562 expectNone(withinMillis, "Did not expect an element but got element"); 563 } 564 565 public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException { 566 received.expectNone(withinMillis, errMsgPrefix); 567 } 568 569 } 570 571 public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> { 572 573 public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) { 574 super(env); 575 } 576 577 @Override 578 public void onNext(T element) { 579 env.debug(String.format("%s::onNext(%s)", this, element)); 580 if (subscription.isCompleted()) { 581 super.onNext(element); 582 } else { 583 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 584 } 585 } 586 587 @Override 588 public void onComplete() { 589 env.debug(this + "::onComplete()"); 590 if (subscription.isCompleted()) { 591 super.onComplete(); 592 } else { 593 env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe"); 594 } 595 } 596 597 @Override 598 public void onSubscribe(Subscription s) { 599 env.debug(String.format("%s::onSubscribe(%s)", this, s)); 600 if (!subscription.isCompleted()) { 601 subscription.complete(s); 602 } else { 603 env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber"); 604 } 605 } 606 607 @Override 608 public void onError(Throwable cause) { 609 env.debug(String.format("%s::onError(%s)", this, cause)); 610 if (subscription.isCompleted()) { 611 super.onError(cause); 612 } else { 613 env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause)); 614 } 615 } 616 } 617 618 /** 619 * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport} 620 * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert 621 * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher. 622 */ 623 public static class BlackholeSubscriberWithSubscriptionSupport<T> 624 extends ManualSubscriberWithSubscriptionSupport<T> { 625 626 public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) { 627 super(env); 628 } 629 630 @Override 631 public void onNext(T element) { 632 env.debug(String.format("%s::onNext(%s)", this, element)); 633 if (!subscription.isCompleted()) { 634 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 635 } 636 } 637 638 @Override 639 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 640 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 641 } 642 643 @Override 644 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 645 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 646 } 647 } 648 649 public static class TestSubscriber<T> implements Subscriber<T> { 650 final Promise<Subscription> subscription; 651 652 protected final TestEnvironment env; 653 654 public TestSubscriber(TestEnvironment env) { 655 this.env = env; 656 subscription = new Promise<Subscription>(env); 657 } 658 659 @Override 660 public void onError(Throwable cause) { 661 env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause)); 662 } 663 664 @Override 665 public void onComplete() { 666 env.flop("Unexpected Subscriber::onComplete()"); 667 } 668 669 @Override 670 public void onNext(T element) { 671 env.flop(String.format("Unexpected Subscriber::onNext(%s)", element)); 672 } 673 674 @Override 675 public void onSubscribe(Subscription subscription) { 676 env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription)); 677 } 678 679 public void cancel() { 680 if (subscription.isCompleted()) { 681 subscription.value().cancel(); 682 } else { 683 env.flop("Cannot cancel a subscription before having received it"); 684 } 685 } 686 } 687 688 public static class ManualPublisher<T> implements Publisher<T> { 689 protected final TestEnvironment env; 690 691 protected long pendingDemand = 0L; 692 protected Promise<Subscriber<? super T>> subscriber; 693 694 protected final Receptacle<Long> requests; 695 696 protected final Latch cancelled; 697 698 public ManualPublisher(TestEnvironment env) { 699 this.env = env; 700 requests = new Receptacle<Long>(env); 701 cancelled = new Latch(env); 702 subscriber = new Promise<Subscriber<? super T>>(this.env); 703 } 704 705 @Override 706 public void subscribe(Subscriber<? super T> s) { 707 if (!subscriber.isCompleted()) { 708 subscriber.completeImmediatly(s); 709 710 Subscription subs = new Subscription() { 711 @Override 712 public void request(long elements) { 713 requests.add(elements); 714 } 715 716 @Override 717 public void cancel() { 718 cancelled.close(); 719 } 720 }; 721 s.onSubscribe(subs); 722 723 } else { 724 env.flop("TestPublisher doesn't support more than one Subscriber"); 725 } 726 } 727 728 public void sendNext(T element) { 729 if (subscriber.isCompleted()) { 730 subscriber.value().onNext(element); 731 } else { 732 env.flop("Cannot sendNext before having a Subscriber"); 733 } 734 } 735 736 public void sendCompletion() { 737 if (subscriber.isCompleted()) { 738 subscriber.value().onComplete(); 739 } else { 740 env.flop("Cannot sendCompletion before having a Subscriber"); 741 } 742 } 743 744 public void sendError(Throwable cause) { 745 if (subscriber.isCompleted()) { 746 subscriber.value().onError(cause); 747 } else { 748 env.flop("Cannot sendError before having a Subscriber"); 749 } 750 } 751 752 public long expectRequest() throws InterruptedException { 753 return expectRequest(env.defaultTimeoutMillis()); 754 } 755 756 public long expectRequest(long timeoutMillis) throws InterruptedException { 757 long requested = requests.next(timeoutMillis, "Did not receive expected `request` call"); 758 if (requested <= 0) { 759 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 760 } else { 761 pendingDemand += requested; 762 return requested; 763 } 764 } 765 766 public void expectExactRequest(long expected) throws InterruptedException { 767 expectExactRequest(expected, env.defaultTimeoutMillis()); 768 } 769 770 public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException { 771 long requested = expectRequest(timeoutMillis); 772 if (requested != expected) { 773 env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected)); 774 } 775 pendingDemand += requested; 776 } 777 778 public void expectNoRequest() throws InterruptedException { 779 expectNoRequest(env.defaultTimeoutMillis()); 780 } 781 782 public void expectNoRequest(long timeoutMillis) throws InterruptedException { 783 requests.expectNone(timeoutMillis, "Received an unexpected call to: request: "); 784 } 785 786 public void expectCancelling() throws InterruptedException { 787 expectCancelling(env.defaultTimeoutMillis()); 788 } 789 790 public void expectCancelling(long timeoutMillis) throws InterruptedException { 791 cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription"); 792 } 793 794 public boolean isCancelled() throws InterruptedException { 795 return cancelled.isClosed(); 796 } 797 } 798 799 /** 800 * Like a CountDownLatch, but resettable and with some convenience methods 801 */ 802 public static class Latch { 803 private final TestEnvironment env; 804 volatile private CountDownLatch countDownLatch = new CountDownLatch(1); 805 806 public Latch(TestEnvironment env) { 807 this.env = env; 808 } 809 810 public void reOpen() { 811 countDownLatch = new CountDownLatch(1); 812 } 813 814 public boolean isClosed() { 815 return countDownLatch.getCount() == 0; 816 } 817 818 public void close() { 819 countDownLatch.countDown(); 820 } 821 822 public void assertClosed(String openErrorMsg) { 823 if (!isClosed()) { 824 env.flop(new ExpectedClosedLatchException(openErrorMsg)); 825 } 826 } 827 828 public void assertOpen(String closedErrorMsg) { 829 if (isClosed()) { 830 env.flop(new ExpectedOpenLatchException(closedErrorMsg)); 831 } 832 } 833 834 public void expectClose(String notClosedErrorMsg) throws InterruptedException { 835 expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg); 836 } 837 838 public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException { 839 countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); 840 if (countDownLatch.getCount() > 0) { 841 env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis)); 842 } 843 } 844 845 static final class ExpectedOpenLatchException extends RuntimeException { 846 public ExpectedOpenLatchException(String message) { 847 super(message); 848 } 849 } 850 851 static final class ExpectedClosedLatchException extends RuntimeException { 852 public ExpectedClosedLatchException(String message) { 853 super(message); 854 } 855 } 856 857 } 858 859 // simple promise for *one* value, which cannot be reset 860 public static class Promise<T> { 861 private final TestEnvironment env; 862 863 public static <T> Promise<T> completed(TestEnvironment env, T value) { 864 Promise<T> promise = new Promise<T>(env); 865 promise.completeImmediatly(value); 866 return promise; 867 } 868 869 public Promise(TestEnvironment env) { 870 this.env = env; 871 } 872 873 private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1); 874 private volatile T _value = null; 875 876 public T value() { 877 if (isCompleted()) { 878 return _value; 879 } else { 880 env.flop("Cannot access promise value before completion"); 881 return null; 882 } 883 } 884 885 public boolean isCompleted() { 886 return _value != null; 887 } 888 889 /** 890 * Allows using expectCompletion to await for completion of the value and complete it _then_ 891 */ 892 public void complete(T value) { 893 abq.add(value); 894 } 895 896 /** 897 * Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way 898 */ 899 public void completeImmediatly(T value) { 900 complete(value); // complete! 901 _value = value; // immediatly! 902 } 903 904 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 905 if (!isCompleted()) { 906 T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 907 908 if (val == null) { 909 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 910 } else { 911 _value = val; 912 } 913 } 914 } 915 } 916 917 // a "Promise" for multiple values, which also supports "end-of-stream reached" 918 public static class Receptacle<T> { 919 final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE; 920 private final TestEnvironment env; 921 922 private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE); 923 924 private final Latch completedLatch; 925 926 Receptacle(TestEnvironment env) { 927 this.env = env; 928 this.completedLatch = new Latch(env); 929 } 930 931 public void add(T value) { 932 completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value)); 933 934 abq.add(Optional.of(value)); 935 } 936 937 public void complete() { 938 completedLatch.assertOpen("Unexpected additional complete signal received!"); 939 completedLatch.close(); 940 941 abq.add(Optional.<T>empty()); 942 } 943 944 public T next(long timeoutMillis, String errorMsg) throws InterruptedException { 945 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 946 947 if (value == null) { 948 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 949 } else if (value.isDefined()) { 950 return value.get(); 951 } else { 952 return env.flopAndFail("Expected element but got end-of-stream"); 953 } 954 } 955 956 public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 957 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 958 959 if (value == null) { 960 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 961 return Optional.empty(); 962 } 963 964 return value; 965 } 966 967 /** 968 * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements 969 */ 970 public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 971 List<T> result = new LinkedList<T>(); 972 long remaining = elements; 973 long deadline = System.currentTimeMillis() + timeoutMillis; 974 while (remaining > 0) { 975 long remainingMillis = deadline - System.currentTimeMillis(); 976 977 result.add(next(remainingMillis, errorMsg)); 978 remaining--; 979 } 980 981 return result; 982 } 983 984 985 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 986 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 987 988 if (value == null) { 989 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 990 } else if (value.isDefined()) { 991 env.flop(String.format("Expected end-of-stream but got element [%s]", value.get())); 992 } // else, ok 993 } 994 995 @SuppressWarnings("unchecked") 996 public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception { 997 Thread.sleep(timeoutMillis); 998 999 if (env.asyncErrors.isEmpty()) { 1000 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1001 } else { 1002 // ok, there was an expected error 1003 Throwable thrown = env.asyncErrors.remove(0); 1004 1005 if (clazz.isInstance(thrown)) { 1006 return (E) thrown; 1007 } else { 1008 1009 return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", 1010 errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())); 1011 } 1012 } 1013 } 1014 1015 public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException { 1016 Thread.sleep(withinMillis); 1017 Optional<T> value = abq.poll(); 1018 1019 if (value == null) { 1020 // ok 1021 } else if (value.isDefined()) { 1022 env.flop(String.format("%s [%s]", errorMsgPrefix, value.get())); 1023 } else { 1024 env.flop("Expected no element but got end-of-stream"); 1025 } 1026 } 1027 } 1028} 1029