001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.Subscriber; 016import org.reactivestreams.Subscription; 017import org.reactivestreams.tck.flow.support.Optional; 018import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException; 019 020import java.util.Collections; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.ArrayBlockingQueue; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicReference; 028 029import static java.util.concurrent.TimeUnit.MILLISECONDS; 030import static java.util.concurrent.TimeUnit.NANOSECONDS; 031import static org.testng.Assert.assertTrue; 032import static org.testng.Assert.fail; 033 034public class TestEnvironment { 035 public static final int TEST_BUFFER_SIZE = 16; 036 037 private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS"; 038 private static final long DEFAULT_TIMEOUT_MILLIS = 100; 039 040 private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS"; 041 private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV"; 042 043 private final long defaultTimeoutMillis; 044 private final long defaultPollTimeoutMillis; 045 private final long defaultNoSignalsTimeoutMillis; 046 private final boolean printlnDebug; 047 048 private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>(); 049 050 /** 051 * Tests must specify the timeout for expected outcome of asynchronous 052 * interactions. Longer timeout does not invalidate the correctness of 053 * the implementation, but can in some cases result in longer time to 054 * run the tests. 055 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 056 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 057 * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't 058 * preempted by an asynchronous event. 059 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 060 */ 061 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis, 062 boolean printlnDebug) { 063 this.defaultTimeoutMillis = defaultTimeoutMillis; 064 this.defaultPollTimeoutMillis = defaultPollTimeoutMillis; 065 this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis; 066 this.printlnDebug = printlnDebug; 067 } 068 069 /** 070 * Tests must specify the timeout for expected outcome of asynchronous 071 * interactions. Longer timeout does not invalidate the correctness of 072 * the implementation, but can in some cases result in longer time to 073 * run the tests. 074 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 075 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 076 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 077 */ 078 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) { 079 this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug); 080 } 081 082 /** 083 * Tests must specify the timeout for expected outcome of asynchronous 084 * interactions. Longer timeout does not invalidate the correctness of 085 * the implementation, but can in some cases result in longer time to 086 * run the tests. 087 * 088 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 089 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 090 * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't 091 * preempted by an asynchronous event. 092 */ 093 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) { 094 this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false); 095 } 096 097 /** 098 * Tests must specify the timeout for expected outcome of asynchronous 099 * interactions. Longer timeout does not invalidate the correctness of 100 * the implementation, but can in some cases result in longer time to 101 * run the tests. 102 * 103 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 104 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore 105 */ 106 public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) { 107 this(defaultTimeoutMillis, defaultTimeoutMillis, defaultNoSignalsTimeoutMillis); 108 } 109 110 /** 111 * Tests must specify the timeout for expected outcome of asynchronous 112 * interactions. Longer timeout does not invalidate the correctness of 113 * the implementation, but can in some cases result in longer time to 114 * run the tests. 115 * 116 * @param defaultTimeoutMillis default timeout to be used in all expect* methods 117 */ 118 public TestEnvironment(long defaultTimeoutMillis) { 119 this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis); 120 } 121 122 /** 123 * Tests must specify the timeout for expected outcome of asynchronous 124 * interactions. Longer timeout does not invalidate the correctness of 125 * the implementation, but can in some cases result in longer time to 126 * run the tests. 127 * 128 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 129 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 130 * 131 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, 132 * often helpful to pinpoint simple race conditions etc. 133 */ 134 public TestEnvironment(boolean printlnDebug) { 135 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug); 136 } 137 138 /** 139 * Tests must specify the timeout for expected outcome of asynchronous 140 * interactions. Longer timeout does not invalidate the correctness of 141 * the implementation, but can in some cases result in longer time to 142 * run the tests. 143 * 144 * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS} 145 * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used. 146 */ 147 public TestEnvironment() { 148 this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis()); 149 } 150 151 /** This timeout is used when waiting for a signal to arrive. */ 152 public long defaultTimeoutMillis() { 153 return defaultTimeoutMillis; 154 } 155 156 /** 157 * This timeout is used when asserting that no further signals are emitted. 158 * Note that this timeout default 159 */ 160 public long defaultNoSignalsTimeoutMillis() { 161 return defaultNoSignalsTimeoutMillis; 162 } 163 164 /** 165 * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous 166 * event. 167 */ 168 public long defaultPollTimeoutMillis() { 169 return defaultPollTimeoutMillis; 170 } 171 172 /** 173 * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 174 * 175 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 176 */ 177 public static long envDefaultTimeoutMillis() { 178 final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV); 179 if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS; 180 else try { 181 return Long.parseLong(envMillis); 182 } catch (NumberFormatException ex) { 183 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex); 184 } 185 } 186 187 /** 188 * Tries to parse the env variable {@code DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. 189 * 190 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 191 */ 192 public static long envDefaultNoSignalsTimeoutMillis() { 193 final String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV); 194 if (envMillis == null) return envDefaultTimeoutMillis(); 195 else try { 196 return Long.parseLong(envMillis); 197 } catch (NumberFormatException ex) { 198 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex); 199 } 200 } 201 202 /** 203 * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value. 204 * 205 * @throws java.lang.IllegalArgumentException when unable to parse the env variable 206 */ 207 public static long envDefaultPollTimeoutMillis() { 208 final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV); 209 if (envMillis == null) return envDefaultTimeoutMillis(); 210 else try { 211 return Long.parseLong(envMillis); 212 } catch (NumberFormatException ex) { 213 throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex); 214 } 215 } 216 217 /** 218 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 219 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 220 * 221 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 222 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 223 * from the environment using {@code env.dropAsyncError()}. 224 * 225 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 226 */ 227 public void flop(String msg) { 228 try { 229 fail(msg); 230 } catch (Throwable t) { 231 asyncErrors.add(t); 232 } 233 } 234 235 /** 236 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 237 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 238 * 239 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 240 * 241 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 242 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 243 * from the environment using {@code env.dropAsyncError()}. 244 * 245 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 246 */ 247 public void flop(Throwable thr, String msg) { 248 try { 249 fail(msg, thr); 250 } catch (Throwable t) { 251 asyncErrors.add(thr); 252 } 253 } 254 255 /** 256 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 257 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. 258 * 259 * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this. 260 * 261 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 262 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 263 * from the environment using {@code env.dropAsyncError()}. 264 * 265 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 266 */ 267 public void flop(Throwable thr) { 268 try { 269 fail(thr.getMessage(), thr); 270 } catch (Throwable t) { 271 asyncErrors.add(thr); 272 } 273 } 274 275 /** 276 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. 277 * 278 * This method DOES fail the test right away (it tries to, by throwing an AssertionException), 279 * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error. 280 * 281 * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution. 282 * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly 283 * from the environment using {@code env.dropAsyncError()}. 284 * 285 * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()} 286 */ 287 public <T> T flopAndFail(String msg) { 288 try { 289 fail(msg); 290 } catch (Throwable t) { 291 asyncErrors.add(t); 292 fail(msg, t); 293 } 294 return null; // unreachable, the previous block will always exit by throwing 295 } 296 297 298 299 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException { 300 subscribe(pub, sub, defaultTimeoutMillis); 301 } 302 303 public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException { 304 pub.subscribe(sub); 305 sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); 306 verifyNoAsyncErrorsNoDelay(); 307 } 308 309 public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException { 310 ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this); 311 subscribe(pub, sub, defaultTimeoutMillis()); 312 return sub; 313 } 314 315 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException { 316 return newManualSubscriber(pub, defaultTimeoutMillis()); 317 } 318 319 public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException { 320 ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this); 321 subscribe(pub, sub, timeoutMillis); 322 return sub; 323 } 324 325 public void clearAsyncErrors() { 326 asyncErrors.clear(); 327 } 328 329 public Throwable dropAsyncError() { 330 try { 331 return asyncErrors.remove(0); 332 } catch (IndexOutOfBoundsException ex) { 333 return null; 334 } 335 } 336 337 /** 338 * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors 339 * were signalled pior to, or during that time (by calling {@code flop()}). 340 */ 341 public void verifyNoAsyncErrors() { 342 verifyNoAsyncErrors(defaultNoSignalsTimeoutMillis()); 343 } 344 345 /** 346 * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled 347 * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time. 348 * <p></p> 349 * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)}, 350 * and if no errors encountered wait for another default timeout as the errors may yet be signalled. 351 * The initial check is performed in order to fail-fast in case of an already failed test. 352 */ 353 public void verifyNoAsyncErrors(long delay) { 354 try { 355 verifyNoAsyncErrorsNoDelay(); 356 357 Thread.sleep(delay); 358 verifyNoAsyncErrorsNoDelay(); 359 } catch (InterruptedException e) { 360 throw new RuntimeException(e); 361 } 362 } 363 364 /** 365 * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}). 366 * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used 367 * for example in tight loops etc. 368 */ 369 public void verifyNoAsyncErrorsNoDelay() { 370 for (Throwable e : asyncErrors) { 371 if (e instanceof AssertionError) { 372 throw (AssertionError) e; 373 } else { 374 fail(String.format("Async error during test execution: %s", e.getMessage()), e); 375 } 376 } 377 } 378 379 /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */ 380 public void debug(String msg) { 381 if (debugEnabled()) { 382 System.out.printf("[TCK-DEBUG] %s%n", msg); 383 } 384 } 385 386 public final boolean debugEnabled() { 387 return printlnDebug; 388 } 389 390 /** 391 * Looks for given {@code method} method in stack trace. 392 * Can be used to answer questions like "was this method called from onComplete?". 393 * 394 * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise 395 */ 396 public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) { 397 final Throwable thr = new Throwable(); // gets the stacktrace 398 399 for (StackTraceElement stackElement : thr.getStackTrace()) { 400 if (stackElement.getMethodName().equals(method)) { 401 return Optional.of(stackElement); 402 } 403 } 404 return Optional.empty(); 405 } 406 407 // ---- classes ---- 408 409 /** 410 * {@link Subscriber} implementation which can be steered by test code and asserted on. 411 */ 412 public static class ManualSubscriber<T> extends TestSubscriber<T> { 413 Receptacle<T> received; 414 415 public ManualSubscriber(TestEnvironment env) { 416 super(env); 417 received = new Receptacle<T>(this.env); 418 } 419 420 @Override 421 public void onNext(T element) { 422 try { 423 received.add(element); 424 } catch (IllegalStateException ex) { 425 // error message refinement 426 throw new SubscriberBufferOverflowException( 427 String.format("Received more than bufferSize (%d) onNext signals. " + 428 "The Publisher probably emited more signals than expected!", 429 received.QUEUE_SIZE), ex); 430 } 431 } 432 433 @Override 434 public void onComplete() { 435 received.complete(); 436 } 437 438 public void request(long elements) { 439 subscription.value().request(elements); 440 } 441 442 public T requestNextElement() throws InterruptedException { 443 return requestNextElement(env.defaultTimeoutMillis()); 444 } 445 446 public T requestNextElement(long timeoutMillis) throws InterruptedException { 447 return requestNextElement(timeoutMillis, "Did not receive expected element"); 448 } 449 450 public T requestNextElement(String errorMsg) throws InterruptedException { 451 return requestNextElement(env.defaultTimeoutMillis(), errorMsg); 452 } 453 454 public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 455 request(1); 456 return nextElement(timeoutMillis, errorMsg); 457 } 458 459 public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException { 460 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 461 } 462 463 public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException { 464 return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg); 465 } 466 467 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 468 return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 469 } 470 471 public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 472 request(1); 473 return nextElementOrEndOfStream(timeoutMillis, errorMsg); 474 } 475 476 public void requestEndOfStream() throws InterruptedException { 477 requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 478 } 479 480 public void requestEndOfStream(long timeoutMillis) throws InterruptedException { 481 requestEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 482 } 483 484 public void requestEndOfStream(String errorMsg) throws InterruptedException { 485 requestEndOfStream(env.defaultTimeoutMillis(), errorMsg); 486 } 487 488 public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 489 request(1); 490 expectCompletion(timeoutMillis, errorMsg); 491 } 492 493 public List<T> requestNextElements(long elements) throws InterruptedException { 494 request(elements); 495 return nextElements(elements, env.defaultTimeoutMillis()); 496 } 497 498 public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException { 499 request(elements); 500 return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements)); 501 } 502 503 public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 504 request(elements); 505 return nextElements(elements, timeoutMillis, errorMsg); 506 } 507 508 public T nextElement() throws InterruptedException { 509 return nextElement(env.defaultTimeoutMillis()); 510 } 511 512 public T nextElement(long timeoutMillis) throws InterruptedException { 513 return nextElement(timeoutMillis, "Did not receive expected element"); 514 } 515 516 public T nextElement(String errorMsg) throws InterruptedException { 517 return nextElement(env.defaultTimeoutMillis(), errorMsg); 518 } 519 520 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 521 return received.next(timeoutMillis, errorMsg); 522 } 523 524 public Optional<T> nextElementOrEndOfStream() throws InterruptedException { 525 return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 526 } 527 528 public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException { 529 return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion"); 530 } 531 532 public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 533 return received.nextOrEndOfStream(timeoutMillis, errorMsg); 534 } 535 536 public List<T> nextElements(long elements) throws InterruptedException { 537 return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion"); 538 } 539 540 public List<T> nextElements(long elements, String errorMsg) throws InterruptedException { 541 return nextElements(elements, env.defaultTimeoutMillis(), errorMsg); 542 } 543 544 public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException { 545 return nextElements(elements, timeoutMillis, "Did not receive expected element or completion"); 546 } 547 548 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 549 return received.nextN(elements, timeoutMillis, errorMsg); 550 } 551 552 public void expectNext(T expected) throws InterruptedException { 553 expectNext(expected, env.defaultTimeoutMillis()); 554 } 555 556 public void expectNext(T expected, long timeoutMillis) throws InterruptedException { 557 T received = nextElement(timeoutMillis, "Did not receive expected element on downstream"); 558 if (!received.equals(expected)) { 559 env.flop(String.format("Expected element %s on downstream but received %s", expected, received)); 560 } 561 } 562 563 public void expectCompletion() throws InterruptedException { 564 expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion"); 565 } 566 567 public void expectCompletion(long timeoutMillis) throws InterruptedException { 568 expectCompletion(timeoutMillis, "Did not receive expected stream completion"); 569 } 570 571 public void expectCompletion(String errorMsg) throws InterruptedException { 572 expectCompletion(env.defaultTimeoutMillis(), errorMsg); 573 } 574 575 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 576 received.expectCompletion(timeoutMillis, errorMsg); 577 } 578 579 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception { 580 expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis()); 581 } 582 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception { 583 expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis()); 584 } 585 586 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 587 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception { 588 expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis); 589 } 590 591 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception { 592 expectErrorWithMessage(expected, requiredMessagePartAlternatives, timeoutMillis, timeoutMillis); 593 } 594 595 public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, 596 long totalTimeoutMillis, long pollTimeoutMillis) throws Exception { 597 final E err = expectError(expected, totalTimeoutMillis, pollTimeoutMillis); 598 final String message = err.getMessage(); 599 600 boolean contains = false; 601 for (String requiredMessagePart : requiredMessagePartAlternatives) 602 if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to 603 assertTrue(contains, 604 String.format("Got expected exception [%s] but missing message part [%s], was: %s", 605 err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage())); 606 } 607 608 public <E extends Throwable> E expectError(Class<E> expected) throws Exception { 609 return expectError(expected, env.defaultTimeoutMillis()); 610 } 611 612 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception { 613 return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis()); 614 } 615 616 public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception { 617 return expectError(expected, env.defaultTimeoutMillis(), errorMsg); 618 } 619 620 public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception { 621 return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis(), errorMsg); 622 } 623 624 public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis) throws Exception { 625 return expectError(expected, totalTimeoutMillis, pollTimeoutMillis, String.format("Expected onError(%s)", expected.getName())); 626 } 627 628 public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis, 629 String errorMsg) throws Exception { 630 return received.expectError(expected, totalTimeoutMillis, pollTimeoutMillis, errorMsg); 631 } 632 633 public void expectNone() throws InterruptedException { 634 expectNone(env.defaultNoSignalsTimeoutMillis()); 635 } 636 637 public void expectNone(String errMsgPrefix) throws InterruptedException { 638 expectNone(env.defaultNoSignalsTimeoutMillis(), errMsgPrefix); 639 } 640 641 public void expectNone(long withinMillis) throws InterruptedException { 642 expectNone(withinMillis, "Did not expect an element but got element"); 643 } 644 645 public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException { 646 received.expectNone(withinMillis, errMsgPrefix); 647 } 648 649 } 650 651 public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> { 652 653 public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) { 654 super(env); 655 } 656 657 @Override 658 public void onNext(T element) { 659 if (env.debugEnabled()) { 660 env.debug(String.format("%s::onNext(%s)", this, element)); 661 } 662 if (subscription.isCompleted()) { 663 super.onNext(element); 664 } else { 665 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 666 } 667 } 668 669 @Override 670 public void onComplete() { 671 if (env.debugEnabled()) { 672 env.debug(this + "::onComplete()"); 673 } 674 if (subscription.isCompleted()) { 675 super.onComplete(); 676 } else { 677 env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe"); 678 } 679 } 680 681 @Override 682 public void onSubscribe(Subscription s) { 683 if (env.debugEnabled()) { 684 env.debug(String.format("%s::onSubscribe(%s)", this, s)); 685 } 686 if (!subscription.isCompleted()) { 687 subscription.complete(s); 688 } else { 689 env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber"); 690 } 691 } 692 693 @Override 694 public void onError(Throwable cause) { 695 if (env.debugEnabled()) { 696 env.debug(String.format("%s::onError(%s)", this, cause)); 697 } 698 if (subscription.isCompleted()) { 699 super.onError(cause); 700 } else { 701 env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause)); 702 } 703 } 704 } 705 706 /** 707 * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport} 708 * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert 709 * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher. 710 */ 711 public static class BlackholeSubscriberWithSubscriptionSupport<T> 712 extends ManualSubscriberWithSubscriptionSupport<T> { 713 714 public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) { 715 super(env); 716 } 717 718 @Override 719 public void onNext(T element) { 720 if (env.debugEnabled()) { 721 env.debug(String.format("%s::onNext(%s)", this, element)); 722 } 723 if (!subscription.isCompleted()) { 724 env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element)); 725 } 726 } 727 728 @Override 729 public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException { 730 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 731 } 732 733 @Override 734 public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 735 throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!"); 736 } 737 } 738 739 public static class TestSubscriber<T> implements Subscriber<T> { 740 final Promise<Subscription> subscription; 741 742 protected final TestEnvironment env; 743 744 public TestSubscriber(TestEnvironment env) { 745 this.env = env; 746 subscription = new Promise<Subscription>(env); 747 } 748 749 @Override 750 public void onError(Throwable cause) { 751 env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause)); 752 } 753 754 @Override 755 public void onComplete() { 756 env.flop("Unexpected Subscriber::onComplete()"); 757 } 758 759 @Override 760 public void onNext(T element) { 761 env.flop(String.format("Unexpected Subscriber::onNext(%s)", element)); 762 } 763 764 @Override 765 public void onSubscribe(Subscription subscription) { 766 env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription)); 767 } 768 769 public void cancel() { 770 if (subscription.isCompleted()) { 771 subscription.value().cancel(); 772 } else { 773 env.flop("Cannot cancel a subscription before having received it"); 774 } 775 } 776 } 777 778 public static class ManualPublisher<T> implements Publisher<T> { 779 protected final TestEnvironment env; 780 781 protected long pendingDemand = 0L; 782 protected Promise<Subscriber<? super T>> subscriber; 783 784 protected final Receptacle<Long> requests; 785 786 protected final Latch cancelled; 787 788 public ManualPublisher(TestEnvironment env) { 789 this.env = env; 790 requests = new Receptacle<Long>(env); 791 cancelled = new Latch(env); 792 subscriber = new Promise<Subscriber<? super T>>(this.env); 793 } 794 795 @Override 796 public void subscribe(Subscriber<? super T> s) { 797 if (!subscriber.isCompleted()) { 798 subscriber.completeImmediatly(s); 799 800 Subscription subs = new Subscription() { 801 @Override 802 public void request(long elements) { 803 requests.add(elements); 804 } 805 806 @Override 807 public void cancel() { 808 cancelled.close(); 809 } 810 }; 811 s.onSubscribe(subs); 812 813 } else { 814 env.flop("TestPublisher doesn't support more than one Subscriber"); 815 } 816 } 817 818 public void sendNext(T element) { 819 if (subscriber.isCompleted()) { 820 subscriber.value().onNext(element); 821 } else { 822 env.flop("Cannot sendNext before having a Subscriber"); 823 } 824 } 825 826 public void sendCompletion() { 827 if (subscriber.isCompleted()) { 828 subscriber.value().onComplete(); 829 } else { 830 env.flop("Cannot sendCompletion before having a Subscriber"); 831 } 832 } 833 834 public void sendError(Throwable cause) { 835 if (subscriber.isCompleted()) { 836 subscriber.value().onError(cause); 837 } else { 838 env.flop("Cannot sendError before having a Subscriber"); 839 } 840 } 841 842 public long expectRequest() throws InterruptedException { 843 return expectRequest(env.defaultTimeoutMillis()); 844 } 845 846 public long expectRequest(long timeoutMillis) throws InterruptedException { 847 long requested = requests.next(timeoutMillis, "Did not receive expected `request` call"); 848 if (requested <= 0) { 849 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 850 } else { 851 pendingDemand += requested; 852 return requested; 853 } 854 } 855 856 857 public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException { 858 long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum)); 859 if (requested <= 0) { 860 return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested)); 861 } else { 862 pendingDemand += requested; 863 return requested; 864 } 865 } 866 867 public void expectExactRequest(long expected) throws InterruptedException { 868 expectExactRequest(expected, env.defaultTimeoutMillis()); 869 } 870 871 public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException { 872 long requested = expectRequest(timeoutMillis); 873 if (requested != expected) { 874 env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected)); 875 } 876 pendingDemand += requested; 877 } 878 879 public void expectNoRequest() throws InterruptedException { 880 expectNoRequest(env.defaultTimeoutMillis()); 881 } 882 883 public void expectNoRequest(long timeoutMillis) throws InterruptedException { 884 requests.expectNone(timeoutMillis, "Received an unexpected call to: request: "); 885 } 886 887 public void expectCancelling() throws InterruptedException { 888 expectCancelling(env.defaultTimeoutMillis()); 889 } 890 891 public void expectCancelling(long timeoutMillis) throws InterruptedException { 892 cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription"); 893 } 894 895 public boolean isCancelled() throws InterruptedException { 896 return cancelled.isClosed(); 897 } 898 } 899 900 /** 901 * Like a CountDownLatch, but resettable and with some convenience methods 902 */ 903 public static class Latch { 904 private final TestEnvironment env; 905 volatile private CountDownLatch countDownLatch = new CountDownLatch(1); 906 907 public Latch(TestEnvironment env) { 908 this.env = env; 909 } 910 911 public void reOpen() { 912 countDownLatch = new CountDownLatch(1); 913 } 914 915 public boolean isClosed() { 916 return countDownLatch.getCount() == 0; 917 } 918 919 public void close() { 920 countDownLatch.countDown(); 921 } 922 923 public void assertClosed(String openErrorMsg) { 924 if (!isClosed()) { 925 env.flop(new ExpectedClosedLatchException(openErrorMsg)); 926 } 927 } 928 929 public void assertOpen(String closedErrorMsg) { 930 if (isClosed()) { 931 env.flop(new ExpectedOpenLatchException(closedErrorMsg)); 932 } 933 } 934 935 public void expectClose(String notClosedErrorMsg) throws InterruptedException { 936 expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg); 937 } 938 939 public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException { 940 countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); 941 if (countDownLatch.getCount() > 0) { 942 env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis)); 943 } 944 } 945 946 static final class ExpectedOpenLatchException extends RuntimeException { 947 public ExpectedOpenLatchException(String message) { 948 super(message); 949 } 950 } 951 952 static final class ExpectedClosedLatchException extends RuntimeException { 953 public ExpectedClosedLatchException(String message) { 954 super(message); 955 } 956 } 957 958 } 959 960 // simple promise for *one* value, which cannot be reset 961 public static class Promise<T> { 962 private final TestEnvironment env; 963 964 public static <T> Promise<T> completed(TestEnvironment env, T value) { 965 Promise<T> promise = new Promise<T>(env); 966 promise.completeImmediatly(value); 967 return promise; 968 } 969 970 public Promise(TestEnvironment env) { 971 this.env = env; 972 } 973 974 private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1); 975 private AtomicReference<T> _value = new AtomicReference<T>(); 976 977 public T value() { 978 final T value = _value.get(); 979 if (value != null) { 980 return value; 981 } else { 982 env.flop("Cannot access promise value before completion"); 983 return null; 984 } 985 } 986 987 public boolean isCompleted() { 988 return _value.get() != null; 989 } 990 991 /** 992 * Allows using expectCompletion to await for completion of the value and complete it _then_ 993 */ 994 public void complete(T value) { 995 if (_value.compareAndSet(null, value)) { 996 // we add the value to the queue such to wake up any expectCompletion which was triggered before complete() was called 997 abq.add(value); 998 } else { 999 env.flop(String.format("Cannot complete a promise more than once! Present value: %s, attempted to set: %s", _value.get(), value)); 1000 } 1001 } 1002 1003 /** 1004 * Same as complete. 1005 * 1006 * Keeping this method for binary compatibility. 1007 */ 1008 public void completeImmediatly(T value) { 1009 complete(value); 1010 } 1011 1012 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 1013 if (!isCompleted()) { 1014 T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1015 1016 if (val == null) { 1017 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1018 } 1019 } 1020 } 1021 } 1022 1023 // a "Promise" for multiple values, which also supports "end-of-stream reached" 1024 public static class Receptacle<T> { 1025 final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE; 1026 private final TestEnvironment env; 1027 1028 private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE); 1029 1030 private final Latch completedLatch; 1031 1032 Receptacle(TestEnvironment env) { 1033 this.env = env; 1034 this.completedLatch = new Latch(env); 1035 } 1036 1037 public void add(T value) { 1038 completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value)); 1039 1040 abq.add(Optional.of(value)); 1041 } 1042 1043 public void complete() { 1044 completedLatch.assertOpen("Unexpected additional complete signal received!"); 1045 completedLatch.close(); 1046 1047 abq.add(Optional.<T>empty()); 1048 } 1049 1050 public T next(long timeoutMillis, String errorMsg) throws InterruptedException { 1051 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1052 1053 if (value == null) { 1054 return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1055 } else if (value.isDefined()) { 1056 return value.get(); 1057 } else { 1058 return env.flopAndFail("Expected element but got end-of-stream"); 1059 } 1060 } 1061 1062 public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException { 1063 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1064 1065 if (value == null) { 1066 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1067 return Optional.empty(); 1068 } 1069 1070 return value; 1071 } 1072 1073 /** 1074 * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements 1075 */ 1076 public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException { 1077 List<T> result = new LinkedList<T>(); 1078 long remaining = elements; 1079 long deadline = System.currentTimeMillis() + timeoutMillis; 1080 while (remaining > 0) { 1081 long remainingMillis = deadline - System.currentTimeMillis(); 1082 1083 result.add(next(remainingMillis, errorMsg)); 1084 remaining--; 1085 } 1086 1087 return result; 1088 } 1089 1090 1091 public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException { 1092 Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); 1093 1094 if (value == null) { 1095 env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis)); 1096 } else if (value.isDefined()) { 1097 env.flop(String.format("Expected end-of-stream but got element [%s]", value.get())); 1098 } // else, ok 1099 } 1100 1101 /** 1102 * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}. 1103 */ 1104 @Deprecated 1105 public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception { 1106 return expectError(clazz, timeoutMillis, timeoutMillis, errorMsg); 1107 } 1108 1109 @SuppressWarnings("unchecked") 1110 final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeoutMillis, 1111 long pollTimeoutMillis, 1112 String errorMsg) throws Exception { 1113 long totalTimeoutRemainingNs = MILLISECONDS.toNanos(totalTimeoutMillis); 1114 long timeStampANs = System.nanoTime(); 1115 long timeStampBNs; 1116 1117 for (;;) { 1118 Thread.sleep(Math.min(pollTimeoutMillis, NANOSECONDS.toMillis(totalTimeoutRemainingNs))); 1119 1120 if (env.asyncErrors.isEmpty()) { 1121 timeStampBNs = System.nanoTime(); 1122 totalTimeoutRemainingNs =- timeStampBNs - timeStampANs; 1123 timeStampANs = timeStampBNs; 1124 1125 if (totalTimeoutRemainingNs <= 0) { 1126 return env.flopAndFail(String.format("%s within %d ms", errorMsg, totalTimeoutMillis)); 1127 } 1128 } else { 1129 // ok, there was an expected error 1130 Throwable thrown = env.asyncErrors.remove(0); 1131 1132 if (clazz.isInstance(thrown)) { 1133 return (E) thrown; 1134 } else { 1135 1136 return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", 1137 errorMsg, totalTimeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())); 1138 } 1139 } 1140 } 1141 } 1142 1143 public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException { 1144 Thread.sleep(withinMillis); 1145 Optional<T> value = abq.poll(); 1146 1147 if (value == null) { 1148 // ok 1149 } else if (value.isDefined()) { 1150 env.flop(String.format("%s [%s]", errorMsgPrefix, value.get())); 1151 } else { 1152 env.flop("Expected no element but got end-of-stream"); 1153 } 1154 } 1155 } 1156} 1157