001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException; 018import org.reactivestreams.tck.flow.support.Optional; 019 020import java.util.Collections; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.ArrayBlockingQueue; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.TimeUnit; 027 028import static org.testng.Assert.assertTrue; 029import static org.testng.Assert.fail; 030 031public class TestEnvironment { 032 public static final int TEST_BUFFER_SIZE = 16; 033 034 private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS"; 035 private static final long DEFAULT_TIMEOUT_MILLIS = 100; 036 037 private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS"; 038 039 private final long defaultTimeoutMillis; 040 private final long defaultNoSignalsTimeoutMillis; 041 private final boolean printlnDebug; 042 043 private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>(); 044 045 /** 046 * Tests must specify the timeout for expected outcome of asynchronous 047 * interactions. Longer timeout does not invalidate the correctness of 048 * the implementation, but can in some cases result in longer time to 049 * run the tests. 050 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 051 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 052 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 053 */ 054 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) { 055 this.defaultTimeoutMillis = defaultTimeoutMillis; 056 this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis; 057 this.printlnDebug = printlnDebug; 058 } 059 060 /** 061 * Tests must specify the timeout for expected outcome of asynchronous 062 * interactions. Longer timeout does not invalidate the correctness of 063 * the implementation, but can in some cases result in longer time to 064 * run the tests. 065 * 066 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 067 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 068 */ 069 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) { 070 this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false); 071 } 072 073 /** 074 * Tests must specify the timeout for expected outcome of asynchronous 075 * interactions. Longer timeout does not invalidate the correctness of 076 * the implementation, but can in some cases result in longer time to 077 * run the tests. 078 * 079 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 080 */ 081 public TestEnvironment(long defaultTimeoutMillis) { 082 this(defaultTimeoutMillis, defaultTimeoutMillis, false); 083 } 084 085 /** 086 * Tests must specify the timeout for expected outcome of asynchronous 087 * interactions. Longer timeout does not invalidate the correctness of 088 * the implementation, but can in some cases result in longer time to 089 * run the tests. 090 * 091 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 092 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 093 * 094 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 095 * often helpful to pinpoint simple race conditions etc. 096 */ 097 public TestEnvironment(boolean printlnDebug) { 098 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), printlnDebug); 099 } 100 101 /** 102 * Tests must specify the timeout for expected outcome of asynchronous 103 * interactions. Longer timeout does not invalidate the correctness of 104 * the implementation, but can in some cases result in longer time to 105 * run the tests. 106 * 107 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 108 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 109 */ 110 public TestEnvironment() { 111 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis()); 112 } 113 114 /** This timeout is used when waiting for a signal to arrive. */ 115 public long defaultTimeoutMillis() { 116 return defaultTimeoutMillis; 117 } 118 119 /** 120 * This timeout is used when asserting that no further signals are emitted. 121 * Note that this timeout default 122 */ 123 public long defaultNoSignalsTimeoutMillis() { 124 return defaultNoSignalsTimeoutMillis; 125 } 126 127 /** 128 * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 129 * 130 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 131 */ 132 public static long envDefaultTimeoutMillis() { 133 final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV); 134 if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS; 135 else try { 136 return Long.parseLong(envMillis); 137 } catch (NumberFormatException ex) { 138 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex); 139 } 140 } 141 142 /** 143 * Tries to parse the env variable {@code DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 144 * 145 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 146 */ 147 public static long envDefaultNoSignalsTimeoutMillis() { 148 final String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV); 149 if (envMillis == null) return envDefaultTimeoutMillis(); 150 else try { 151 return Long.parseLong(envMillis); 152 } catch (NumberFormatException ex) { 153 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex); 154 } 155 } 156 157 /** 158 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 159 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 160 * 161 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 162 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 163 * from the environment using {@code env.dropAsyncError()}. 164 * 165 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 166 */ 167 public void flop(String msg) { 168 try { 169 fail(msg); 170 } catch (Throwable t) { 171 asyncErrors.add(t); 172 } 173 } 174 175 /** 176 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 177 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 178 * 179 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 180 * 181 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 182 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 183 * from the environment using {@code env.dropAsyncError()}. 184 * 185 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 186 */ 187 public void flop(Throwable thr, String msg) { 188 try { 189 fail(msg, thr); 190 } catch (Throwable t) { 191 asyncErrors.add(thr); 192 } 193 } 194 195 /** 196 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 197 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 198 * 199 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 200 * 201 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 202 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 203 * from the environment using {@code env.dropAsyncError()}. 204 * 205 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 206 */ 207 public void flop(Throwable thr) { 208 try { 209 fail(thr.getMessage(), thr); 210 } catch (Throwable t) { 211 asyncErrors.add(thr); 212 } 213 } 214 215 /** 216 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 217 * 218 * This method DOES fail the test right away (it tries to, by throwing an AssertionException), 219 * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error. 220 * 221 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 222 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 223 * from the environment using {@code env.dropAsyncError()}. 224 * 225 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 226 */ 227 public <T> T flopAndFail(String msg) { 228 try { 229 fail(msg); 230 } catch (Throwable t) { 231 asyncErrors.add(t); 232 fail(msg, t); 233 } 234 return null; // unreachable, the previous block will always exit by throwing 235 } 236 237 238 239 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException { 240 subscribe(pub, sub, defaultTimeoutMillis); 241 } 242 243 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException { 244 pub.subscribe(sub); 245 sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); 246 verifyNoAsyncErrorsNoDelay(); 247 } 248 249 public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException { 250 ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this); 251 subscribe(pub, sub, defaultTimeoutMillis()); 252 return sub; 253 } 254 255 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException { 256 return newManualSubscriber(pub, defaultTimeoutMillis()); 257 } 258 259 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException { 260 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this); 261 subscribe(pub, sub, timeoutMillis); 262 return sub; 263 } 264 265 public void clearAsyncErrors() { 266 asyncErrors.clear(); 267 } 268 269 public Throwable dropAsyncError() { 270 try { 271 return asyncErrors.remove(0); 272 } catch (IndexOutOfBoundsException ex) { 273 return null; 274 } 275 } 276 277 /** 278 * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors 279 * were signalled pior to, or during that time (by calling {@code flop()}). 280 */ 281 public void verifyNoAsyncErrors() { 282 verifyNoAsyncErrors(defaultNoSignalsTimeoutMillis()); 283 } 284 285 /** 286 * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled 287 * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time. 288 * <p></p> 289 * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)}, 290 * and if no errors encountered wait for another default timeout as the errors may yet be signalled. 291 * The initial check is performed in order to fail-fast in case of an already failed test. 292 */ 293 public void verifyNoAsyncErrors(long delay) { 294 try { 295 verifyNoAsyncErrorsNoDelay(); 296 297 Thread.sleep(delay); 298 verifyNoAsyncErrorsNoDelay(); 299 } catch (InterruptedException e) { 300 throw new RuntimeException(e); 301 } 302 } 303 304 /** 305 * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}). 306 * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used 307 * for example in tight loops etc. 308 */ 309 public void verifyNoAsyncErrorsNoDelay() { 310 for (Throwable e : asyncErrors) { 311 if (e instanceof AssertionError) { 312 throw (AssertionError) e; 313 } else { 314 fail(String.format("Async error during test execution: %s", e.getMessage()), e); 315 } 316 } 317 } 318 319 /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */ 320 public void debug(String msg) { 321 if (printlnDebug) 322 System.out.printf("[TCK-DEBUG] %s%n", msg); 323 } 324 325 /** 326 * Looks for given {@code method} method in stack trace. 327 * Can be used to answer questions like "was this method called from onComplete?". 328 * 329 * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise 330 */ 331 public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) { 332 final Throwable thr = new Throwable(); // gets the stacktrace 333 334 for (StackTraceElement stackElement : thr.getStackTrace()) { 335 if (stackElement.getMethodName().equals(method)) { 336 return Optional.of(stackElement); 337 } 338 } 339 return Optional.empty(); 340 } 341 342 // ---- classes ---- 343 344 /** 345 * {@link Subscriber} implementation which can be steered by test code and asserted on. 346 */ 347 public static class ManualSubscriber<T> extends TestSubscriber<T> { 348 Receptacle<T> received; 349 350 public ManualSubscriber(TestEnvironment env) { 351 super(env); 352 received = new Receptacle<T>(this.env); 353 } 354 355 @Override 356 public void onNext(T element) { 357 try { 358 received.add(element); 359 } catch (IllegalStateException ex) { 360 // error message refinement 361 throw new SubscriberBufferOverflowException( 362 String.format("Received more than bufferSize (%d) onNext signals. " + 363 "The Publisher probably emited more signals than expected!", 364 received.QUEUE_SIZE), ex); 365 } 366 } 367 368 @Override 369 public void onComplete() { 370 received.complete(); 371 } 372 373 public void request(long elements) { 374 subscription.value().request(elements); 375 } 376 377 public T requestNextElement() throws InterruptedException { 378 return requestNextElement(env.defaultTimeoutMillis()); 379 } 380 381 public T requestNextElement(long timeoutMillis) throws InterruptedException { 382 return requestNextElement(timeoutMillis, "Did not receive expected element"); 383 } 384 385 public T requestNextElement(String errorMsg) throws InterruptedException { 386 return requestNextElement(env.defaultTimeoutMillis(), errorMsg); 387 } 388 389 public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 390 request(1); 391 return nextElement(timeoutMillis, errorMsg); 392 } 393 394 public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException { 395 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 396 } 397 398 public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException { 399 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg); 400 } 401 402 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 403 return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 404 } 405 406 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 407 request(1); 408 return nextElementOrEndOfStream(timeoutMillis, errorMsg); 409 } 410 411 public void requestEndOfStream() throws InterruptedException { 412 requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 413 } 414 415 public void requestEndOfStream(long timeoutMillis) throws InterruptedException { 416 requestEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 417 } 418 419 public void requestEndOfStream(String errorMsg) throws InterruptedException { 420 requestEndOfStream(env.defaultTimeoutMillis(), errorMsg); 421 } 422 423 public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 424 request(1); 425 expectCompletion(timeoutMillis, errorMsg); 426 } 427 428 public List<T> requestNextElements(long elements) throws InterruptedException { 429 request(elements); 430 return nextElements(elements, env.defaultTimeoutMillis()); 431 } 432 433 public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException { 434 request(elements); 435 return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements)); 436 } 437 438 public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 439 request(elements); 440 return nextElements(elements, timeoutMillis, errorMsg); 441 } 442 443 public T nextElement() throws InterruptedException { 444 return nextElement(env.defaultTimeoutMillis()); 445 } 446 447 public T nextElement(long timeoutMillis) throws InterruptedException { 448 return nextElement(timeoutMillis, "Did not receive expected element"); 449 } 450 451 public T nextElement(String errorMsg) throws InterruptedException { 452 return nextElement(env.defaultTimeoutMillis(), errorMsg); 453 } 454 455 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 456 return received.next(timeoutMillis, errorMsg); 457 } 458 459 public Optional<T> nextElementOrEndOfStream() throws InterruptedException { 460 return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 461 } 462 463 public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 464 return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 465 } 466 467 public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 468 return received.nextOrEndOfStream(timeoutMillis, errorMsg); 469 } 470 471 public List<T> nextElements(long elements) throws InterruptedException { 472 return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion"); 473 } 474 475 public List<T> nextElements(long elements, String errorMsg) throws InterruptedException { 476 return nextElements(elements, env.defaultTimeoutMillis(), errorMsg); 477 } 478 479 public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException { 480 return nextElements(elements, timeoutMillis, "Did not receive expected element or completion"); 481 } 482 483 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 484 return received.nextN(elements, timeoutMillis, errorMsg); 485 } 486 487 public void expectNext(T expected) throws InterruptedException { 488 expectNext(expected, env.defaultTimeoutMillis()); 489 } 490 491 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 492 T received = nextElement(timeoutMillis, "Did not receive expected element on downstream"); 493 if (!received.equals(expected)) { 494 env.flop(String.format("Expected element %s on downstream but received %s", expected, received)); 495 } 496 } 497 498 public void expectCompletion() throws InterruptedException { 499 expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 500 } 501 502 public void expectCompletion(long timeoutMillis) throws InterruptedException { 503 expectCompletion(timeoutMillis, "Did not receive expected stream completion"); 504 } 505 506 public void expectCompletion(String errorMsg) throws InterruptedException { 507 expectCompletion(env.defaultTimeoutMillis(), errorMsg); 508 } 509 510 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 511 received.expectCompletion(timeoutMillis, errorMsg); 512 } 513 514 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception { 515 expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis()); 516 } 517 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception { 518 expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis()); 519 } 520 521 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 522 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception { 523 expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis); 524 } 525 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception { 526 final E err = expectError(expected, timeoutMillis); 527 final String message = err.getMessage(); 528 529 boolean contains = false; 530 for (String requiredMessagePart : requiredMessagePartAlternatives) 531 if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to 532 assertTrue(contains, 533 String.format("Got expected exception [%s] but missing message part [%s], was: %s", 534 err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage())); 535 } 536 537 public <E extends Throwable> E expectError(Class<E> expected) throws Exception { 538 return expectError(expected, env.defaultTimeoutMillis()); 539 } 540 541 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception { 542 return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName())); 543 } 544 545 public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception { 546 return expectError(expected, env.defaultTimeoutMillis(), errorMsg); 547 } 548 549 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception { 550 return received.expectError(expected, timeoutMillis, errorMsg); 551 } 552 553 public void expectNone() throws InterruptedException { 554 expectNone(env.defaultNoSignalsTimeoutMillis()); 555 } 556 557 public void expectNone(String errMsgPrefix) throws InterruptedException { 558 expectNone(env.defaultNoSignalsTimeoutMillis(), errMsgPrefix); 559 } 560 561 public void expectNone(long withinMillis) throws InterruptedException { 562 expectNone(withinMillis, "Did not expect an element but got element"); 563 } 564 565 public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException { 566 received.expectNone(withinMillis, errMsgPrefix); 567 } 568 569 } 570 571 public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> { 572 573 public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) { 574 super(env); 575 } 576 577 @Override 578 public void onNext(T element) { 579 env.debug(String.format("%s::onNext(%s)", this, element)); 580 if (subscription.isCompleted()) { 581 super.onNext(element); 582 } else { 583 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 584 } 585 } 586 587 @Override 588 public void onComplete() { 589 env.debug(this + "::onComplete()"); 590 if (subscription.isCompleted()) { 591 super.onComplete(); 592 } else { 593 env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe"); 594 } 595 } 596 597 @Override 598 public void onSubscribe(Subscription s) { 599 env.debug(String.format("%s::onSubscribe(%s)", this, s)); 600 if (!subscription.isCompleted()) { 601 subscription.complete(s); 602 } else { 603 env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber"); 604 } 605 } 606 607 @Override 608 public void onError(Throwable cause) { 609 env.debug(String.format("%s::onError(%s)", this, cause)); 610 if (subscription.isCompleted()) { 611 super.onError(cause); 612 } else { 613 env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause)); 614 } 615 } 616 } 617 618 /** 619 * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport} 620 * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert 621 * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher. 622 */ 623 public static class BlackholeSubscriberWithSubscriptionSupport<T> 624 extends ManualSubscriberWithSubscriptionSupport<T> { 625 626 public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) { 627 super(env); 628 } 629 630 @Override 631 public void onNext(T element) { 632 env.debug(String.format("%s::onNext(%s)", this, element)); 633 if (!subscription.isCompleted()) { 634 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 635 } 636 } 637 638 @Override 639 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 640 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 641 } 642 643 @Override 644 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 645 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 646 } 647 } 648 649 public static class TestSubscriber<T> implements Subscriber<T> { 650 final Promise<Subscription> subscription; 651 652 protected final TestEnvironment env; 653 654 public TestSubscriber(TestEnvironment env) { 655 this.env = env; 656 subscription = new Promise<Subscription>(env); 657 } 658 659 @Override 660 public void onError(Throwable cause) { 661 env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause)); 662 } 663 664 @Override 665 public void onComplete() { 666 env.flop("Unexpected Subscriber::onComplete()"); 667 } 668 669 @Override 670 public void onNext(T element) { 671 env.flop(String.format("Unexpected Subscriber::onNext(%s)", element)); 672 } 673 674 @Override 675 public void onSubscribe(Subscription subscription) { 676 env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription)); 677 } 678 679 public void cancel() { 680 if (subscription.isCompleted()) { 681 subscription.value().cancel(); 682 } else { 683 env.flop("Cannot cancel a subscription before having received it"); 684 } 685 } 686 } 687 688 public static class ManualPublisher<T> implements Publisher<T> { 689 protected final TestEnvironment env; 690 691 protected long pendingDemand = 0L; 692 protected Promise<Subscriber<? super T>> subscriber; 693 694 protected final Receptacle<Long> requests; 695 696 protected final Latch cancelled; 697 698 public ManualPublisher(TestEnvironment env) { 699 this.env = env; 700 requests = new Receptacle<Long>(env); 701 cancelled = new Latch(env); 702 subscriber = new Promise<Subscriber<? super T>>(this.env); 703 } 704 705 @Override 706 public void subscribe(Subscriber<? super T> s) { 707 if (!subscriber.isCompleted()) { 708 subscriber.completeImmediatly(s); 709 710 Subscription subs = new Subscription() { 711 @Override 712 public void request(long elements) { 713 requests.add(elements); 714 } 715 716 @Override 717 public void cancel() { 718 cancelled.close(); 719 } 720 }; 721 s.onSubscribe(subs); 722 723 } else { 724 env.flop("TestPublisher doesn't support more than one Subscriber"); 725 } 726 } 727 728 public void sendNext(T element) { 729 if (subscriber.isCompleted()) { 730 subscriber.value().onNext(element); 731 } else { 732 env.flop("Cannot sendNext before having a Subscriber"); 733 } 734 } 735 736 public void sendCompletion() { 737 if (subscriber.isCompleted()) { 738 subscriber.value().onComplete(); 739 } else { 740 env.flop("Cannot sendCompletion before having a Subscriber"); 741 } 742 } 743 744 public void sendError(Throwable cause) { 745 if (subscriber.isCompleted()) { 746 subscriber.value().onError(cause); 747 } else { 748 env.flop("Cannot sendError before having a Subscriber"); 749 } 750 } 751 752 public long expectRequest() throws InterruptedException { 753 return expectRequest(env.defaultTimeoutMillis()); 754 } 755 756 public long expectRequest(long timeoutMillis) throws InterruptedException { 757 long requested = requests.next(timeoutMillis, "Did not receive expected `request` call"); 758 if (requested <= 0) { 759 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 760 } else { 761 pendingDemand += requested; 762 return requested; 763 } 764 } 765 766 767 public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException { 768 long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum)); 769 if (requested <= 0) { 770 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 771 } else { 772 pendingDemand += requested; 773 return requested; 774 } 775 } 776 777 public void expectExactRequest(long expected) throws InterruptedException { 778 expectExactRequest(expected, env.defaultTimeoutMillis()); 779 } 780 781 public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException { 782 long requested = expectRequest(timeoutMillis); 783 if (requested != expected) { 784 env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected)); 785 } 786 pendingDemand += requested; 787 } 788 789 public void expectNoRequest() throws InterruptedException { 790 expectNoRequest(env.defaultTimeoutMillis()); 791 } 792 793 public void expectNoRequest(long timeoutMillis) throws InterruptedException { 794 requests.expectNone(timeoutMillis, "Received an unexpected call to: request: "); 795 } 796 797 public void expectCancelling() throws InterruptedException { 798 expectCancelling(env.defaultTimeoutMillis()); 799 } 800 801 public void expectCancelling(long timeoutMillis) throws InterruptedException { 802 cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription"); 803 } 804 805 public boolean isCancelled() throws InterruptedException { 806 return cancelled.isClosed(); 807 } 808 } 809 810 /** 811 * Like a CountDownLatch, but resettable and with some convenience methods 812 */ 813 public static class Latch { 814 private final TestEnvironment env; 815 volatile private CountDownLatch countDownLatch = new CountDownLatch(1); 816 817 public Latch(TestEnvironment env) { 818 this.env = env; 819 } 820 821 public void reOpen() { 822 countDownLatch = new CountDownLatch(1); 823 } 824 825 public boolean isClosed() { 826 return countDownLatch.getCount() == 0; 827 } 828 829 public void close() { 830 countDownLatch.countDown(); 831 } 832 833 public void assertClosed(String openErrorMsg) { 834 if (!isClosed()) { 835 env.flop(new ExpectedClosedLatchException(openErrorMsg)); 836 } 837 } 838 839 public void assertOpen(String closedErrorMsg) { 840 if (isClosed()) { 841 env.flop(new ExpectedOpenLatchException(closedErrorMsg)); 842 } 843 } 844 845 public void expectClose(String notClosedErrorMsg) throws InterruptedException { 846 expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg); 847 } 848 849 public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException { 850 countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); 851 if (countDownLatch.getCount() > 0) { 852 env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis)); 853 } 854 } 855 856 static final class ExpectedOpenLatchException extends RuntimeException { 857 public ExpectedOpenLatchException(String message) { 858 super(message); 859 } 860 } 861 862 static final class ExpectedClosedLatchException extends RuntimeException { 863 public ExpectedClosedLatchException(String message) { 864 super(message); 865 } 866 } 867 868 } 869 870 // simple promise for *one* value, which cannot be reset 871 public static class Promise<T> { 872 private final TestEnvironment env; 873 874 public static <T> Promise<T> completed(TestEnvironment env, T value) { 875 Promise<T> promise = new Promise<T>(env); 876 promise.completeImmediatly(value); 877 return promise; 878 } 879 880 public Promise(TestEnvironment env) { 881 this.env = env; 882 } 883 884 private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1); 885 private volatile T _value = null; 886 887 public T value() { 888 if (isCompleted()) { 889 return _value; 890 } else { 891 env.flop("Cannot access promise value before completion"); 892 return null; 893 } 894 } 895 896 public boolean isCompleted() { 897 return _value != null; 898 } 899 900 /** 901 * Allows using expectCompletion to await for completion of the value and complete it _then_ 902 */ 903 public void complete(T value) { 904 abq.add(value); 905 } 906 907 /** 908 * Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way 909 */ 910 public void completeImmediatly(T value) { 911 complete(value); // complete! 912 _value = value; // immediatly! 913 } 914 915 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 916 if (!isCompleted()) { 917 T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 918 919 if (val == null) { 920 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 921 } else { 922 _value = val; 923 } 924 } 925 } 926 } 927 928 // a "Promise" for multiple values, which also supports "end-of-stream reached" 929 public static class Receptacle<T> { 930 final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE; 931 private final TestEnvironment env; 932 933 private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE); 934 935 private final Latch completedLatch; 936 937 Receptacle(TestEnvironment env) { 938 this.env = env; 939 this.completedLatch = new Latch(env); 940 } 941 942 public void add(T value) { 943 completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value)); 944 945 abq.add(Optional.of(value)); 946 } 947 948 public void complete() { 949 completedLatch.assertOpen("Unexpected additional complete signal received!"); 950 completedLatch.close(); 951 952 abq.add(Optional.<T>empty()); 953 } 954 955 public T next(long timeoutMillis, String errorMsg) throws InterruptedException { 956 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 957 958 if (value == null) { 959 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 960 } else if (value.isDefined()) { 961 return value.get(); 962 } else { 963 return env.flopAndFail("Expected element but got end-of-stream"); 964 } 965 } 966 967 public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 968 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 969 970 if (value == null) { 971 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 972 return Optional.empty(); 973 } 974 975 return value; 976 } 977 978 /** 979 * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements 980 */ 981 public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 982 List<T> result = new LinkedList<T>(); 983 long remaining = elements; 984 long deadline = System.currentTimeMillis() + timeoutMillis; 985 while (remaining > 0) { 986 long remainingMillis = deadline - System.currentTimeMillis(); 987 988 result.add(next(remainingMillis, errorMsg)); 989 remaining--; 990 } 991 992 return result; 993 } 994 995 996 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 997 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 998 999 if (value == null) { 1000 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1001 } else if (value.isDefined()) { 1002 env.flop(String.format("Expected end-of-stream but got element [%s]", value.get())); 1003 } // else, ok 1004 } 1005 1006 @SuppressWarnings("unchecked") 1007 public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception { 1008 Thread.sleep(timeoutMillis); 1009 1010 if (env.asyncErrors.isEmpty()) { 1011 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1012 } else { 1013 // ok, there was an expected error 1014 Throwable thrown = env.asyncErrors.remove(0); 1015 1016 if (clazz.isInstance(thrown)) { 1017 return (E) thrown; 1018 } else { 1019 1020 return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", 1021 errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())); 1022 } 1023 } 1024 } 1025 1026 public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException { 1027 Thread.sleep(withinMillis); 1028 Optional<T> value = abq.poll(); 1029 1030 if (value == null) { 1031 // ok 1032 } else if (value.isDefined()) { 1033 env.flop(String.format("%s [%s]", errorMsgPrefix, value.get())); 1034 } else { 1035 env.flop("Expected no element but got end-of-stream"); 1036 } 1037 } 1038 } 1039} 1040