001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams.tck;
006
007import org.reactivestreams.Publisher;
008import org.reactivestreams.Subscriber;
009import org.reactivestreams.Subscription;
010import org.reactivestreams.tck.flow.support.Optional;
011import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
012
013import java.util.Collections;
014import java.util.LinkedList;
015import java.util.List;
016import java.util.concurrent.ArrayBlockingQueue;
017import java.util.concurrent.CopyOnWriteArrayList;
018import java.util.concurrent.CountDownLatch;
019import java.util.concurrent.TimeUnit;
020import java.util.concurrent.atomic.AtomicReference;
021
022import static java.util.concurrent.TimeUnit.MILLISECONDS;
023import static java.util.concurrent.TimeUnit.NANOSECONDS;
024import static org.testng.Assert.assertTrue;
025import static org.testng.Assert.fail;
026
027public class TestEnvironment {
028  public static final int TEST_BUFFER_SIZE = 16;
029
030  private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS";
031  private static final long DEFAULT_TIMEOUT_MILLIS = 100;
032
033  private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS";
034  private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV";
035
036  private final long defaultTimeoutMillis;
037  private final long defaultPollTimeoutMillis;
038  private final long defaultNoSignalsTimeoutMillis;
039  private final boolean printlnDebug;
040
041  private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>();
042
043  /**
044   * Tests must specify the timeout for expected outcome of asynchronous
045   * interactions. Longer timeout does not invalidate the correctness of
046   * the implementation, but can in some cases result in longer time to
047   * run the tests.
048   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
049   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
050   * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
051    *                                preempted by an asynchronous event.
052   * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
053   */
054  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis,
055                         boolean printlnDebug) {
056    this.defaultTimeoutMillis = defaultTimeoutMillis;
057    this.defaultPollTimeoutMillis = defaultPollTimeoutMillis;
058    this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis;
059    this.printlnDebug = printlnDebug;
060  }
061
062  /**
063   * Tests must specify the timeout for expected outcome of asynchronous
064   * interactions. Longer timeout does not invalidate the correctness of
065   * the implementation, but can in some cases result in longer time to
066   * run the tests.
067   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
068   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
069   * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
070   */
071  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
072    this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug);
073  }
074
075  /**
076   * Tests must specify the timeout for expected outcome of asynchronous
077   * interactions. Longer timeout does not invalidate the correctness of
078   * the implementation, but can in some cases result in longer time to
079   * run the tests.
080   *
081   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
082   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
083   * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
084   *                                 preempted by an asynchronous event.
085   */
086  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) {
087      this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false);
088  }
089
090  /**
091   * Tests must specify the timeout for expected outcome of asynchronous
092   * interactions. Longer timeout does not invalidate the correctness of
093   * the implementation, but can in some cases result in longer time to
094   * run the tests.
095   *
096   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
097   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
098   */
099  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) {
100    this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis);
101  }
102
103  /**
104   * Tests must specify the timeout for expected outcome of asynchronous
105   * interactions. Longer timeout does not invalidate the correctness of
106   * the implementation, but can in some cases result in longer time to
107   * run the tests.
108   *
109   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
110   */
111  public TestEnvironment(long defaultTimeoutMillis) {
112    this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis);
113  }
114
115  /**
116   * Tests must specify the timeout for expected outcome of asynchronous
117   * interactions. Longer timeout does not invalidate the correctness of
118   * the implementation, but can in some cases result in longer time to
119   * run the tests.
120   *
121   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
122   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
123   *
124   * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
125   *                     often helpful to pinpoint simple race conditions etc.
126   */
127  public TestEnvironment(boolean printlnDebug) {
128    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug);
129  }
130
131  /**
132   * Tests must specify the timeout for expected outcome of asynchronous
133   * interactions. Longer timeout does not invalidate the correctness of
134   * the implementation, but can in some cases result in longer time to
135   * run the tests.
136   *
137   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
138   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
139   */
140  public TestEnvironment() {
141    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis());
142  }
143
144  /** This timeout is used when waiting for a signal to arrive. */
145  public long defaultTimeoutMillis() {
146    return defaultTimeoutMillis;
147  }
148
149  /**
150   * This timeout is used when asserting that no further signals are emitted.
151   * Note that this timeout default
152   */
153  public long defaultNoSignalsTimeoutMillis() {
154    return defaultNoSignalsTimeoutMillis;
155  }
156
157  /**
158   * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous
159   * event.
160   */
161  public long defaultPollTimeoutMillis() {
162    return defaultPollTimeoutMillis;
163  }
164
165  /**
166   * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
167   *
168   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
169   */
170  public static long envDefaultTimeoutMillis() {
171    final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV);
172    if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS;
173    else try {
174      return Long.parseLong(envMillis);
175    } catch (NumberFormatException ex) {
176      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex);
177    }
178  }
179
180  /**
181   * Tries to parse the env variable {@code DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
182   *
183   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
184   */
185  public static long envDefaultNoSignalsTimeoutMillis() {
186    final String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV);
187    if (envMillis == null) return envDefaultTimeoutMillis();
188    else try {
189      return Long.parseLong(envMillis);
190    } catch (NumberFormatException ex) {
191      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex);
192    }
193  }
194
195  /**
196   * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value.
197   *
198   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
199   */
200  public static long envDefaultPollTimeoutMillis() {
201    final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV);
202    if (envMillis == null) return envDefaultTimeoutMillis();
203    else try {
204      return Long.parseLong(envMillis);
205    } catch (NumberFormatException ex) {
206      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex);
207    }
208  }
209
210  /**
211   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
212   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
213   *
214   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
215   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
216   * from the environment using {@code env.dropAsyncError()}.
217   *
218   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
219   */
220  public void flop(String msg) {
221    try {
222      fail(msg);
223    } catch (Throwable t) {
224      asyncErrors.add(t);
225    }
226  }
227
228  /**
229   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
230   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
231   *
232   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
233   *
234   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
235   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
236   * from the environment using {@code env.dropAsyncError()}.
237   *
238   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
239   */
240  public void flop(Throwable thr, String msg) {
241    try {
242      fail(msg, thr);
243    } catch (Throwable t) {
244      asyncErrors.add(thr);
245    }
246  }
247
248  /**
249   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
250   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
251   *
252   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
253   *
254   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
255   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
256   * from the environment using {@code env.dropAsyncError()}.
257   *
258   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
259   */
260  public void flop(Throwable thr) {
261    try {
262      fail(thr.getMessage(), thr);
263    } catch (Throwable t) {
264      asyncErrors.add(thr);
265    }
266  }
267
268  /**
269   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
270   *
271   * This method DOES fail the test right away (it tries to, by throwing an AssertionException),
272   * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error.
273   *
274   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
275   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
276   * from the environment using {@code env.dropAsyncError()}.
277   *
278   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
279   */
280  public <T> T flopAndFail(String msg) {
281    try {
282      fail(msg);
283    } catch (Throwable t) {
284      asyncErrors.add(t);
285      fail(msg, t);
286    }
287    return null; // unreachable, the previous block will always exit by throwing
288  }
289
290
291
292  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
293    subscribe(pub, sub, defaultTimeoutMillis);
294  }
295
296  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
297    pub.subscribe(sub);
298    sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
299    verifyNoAsyncErrorsNoDelay();
300  }
301
302  public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
303    ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this);
304    subscribe(pub, sub, defaultTimeoutMillis());
305    return sub;
306  }
307
308  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
309    return newManualSubscriber(pub, defaultTimeoutMillis());
310  }
311
312  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
313    ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this);
314    subscribe(pub, sub, timeoutMillis);
315    return sub;
316  }
317
318  public void clearAsyncErrors() {
319    asyncErrors.clear();
320  }
321
322  public Throwable dropAsyncError() {
323    try {
324      return asyncErrors.remove(0);
325    } catch (IndexOutOfBoundsException ex) {
326      return null;
327    }
328  }
329
330  /**
331   * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors
332   * were signalled pior to, or during that time (by calling {@code flop()}).
333   */
334  public void verifyNoAsyncErrors() {
335    verifyNoAsyncErrors(defaultNoSignalsTimeoutMillis());
336  }
337
338  /**
339   * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled
340   * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time.
341   * <p></p>
342   * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)},
343   * and if no errors encountered wait for another default timeout as the errors may yet be signalled.
344   * The initial check is performed in order to fail-fast in case of an already failed test.
345   */
346  public void verifyNoAsyncErrors(long delay) {
347    try {
348      verifyNoAsyncErrorsNoDelay();
349
350      Thread.sleep(delay);
351      verifyNoAsyncErrorsNoDelay();
352    } catch (InterruptedException e) {
353      throw new RuntimeException(e);
354    }
355  }
356
357  /**
358   * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}).
359   * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used
360   * for example in tight loops etc.
361   */
362  public void verifyNoAsyncErrorsNoDelay() {
363    for (Throwable e : asyncErrors) {
364      if (e instanceof AssertionError) {
365        throw (AssertionError) e;
366      } else {
367        fail(String.format("Async error during test execution: %s", e.getMessage()), e);
368      }
369    }
370  }
371
372  /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */
373  public void debug(String msg) {
374    if (debugEnabled()) {
375      System.out.printf("[TCK-DEBUG] %s%n", msg);
376    }
377  }
378
379  public final boolean debugEnabled() {
380    return printlnDebug;
381  }
382
383  /**
384   * Looks for given {@code method} method in stack trace.
385   * Can be used to answer questions like "was this method called from onComplete?".
386   *
387   * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise
388   */
389  public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) {
390    final Throwable thr = new Throwable(); // gets the stacktrace
391
392    for (StackTraceElement stackElement : thr.getStackTrace()) {
393      if (stackElement.getMethodName().equals(method)) {
394        return Optional.of(stackElement);
395      }
396    }
397    return Optional.empty();
398  }
399
400  // ---- classes ----
401
402  /**
403   * {@link Subscriber} implementation which can be steered by test code and asserted on.
404   */
405  public static class ManualSubscriber<T> extends TestSubscriber<T> {
406    Receptacle<T> received;
407
408    public ManualSubscriber(TestEnvironment env) {
409      super(env);
410      received = new Receptacle<T>(this.env);
411    }
412
413    @Override
414    public void onNext(T element) {
415      try {
416        received.add(element);
417      } catch (IllegalStateException ex) {
418          // error message refinement
419          throw new SubscriberBufferOverflowException(
420            String.format("Received more than bufferSize (%d) onNext signals. " +
421                            "The Publisher probably emited more signals than expected!",
422                          received.QUEUE_SIZE), ex);
423      }
424    }
425
426    @Override
427    public void onComplete() {
428      received.complete();
429    }
430
431    public void request(long elements) {
432      subscription.value().request(elements);
433    }
434
435    public T requestNextElement() throws InterruptedException {
436      return requestNextElement(env.defaultTimeoutMillis());
437    }
438
439    public T requestNextElement(long timeoutMillis) throws InterruptedException {
440      return requestNextElement(timeoutMillis, "Did not receive expected element");
441    }
442
443    public T requestNextElement(String errorMsg) throws InterruptedException {
444      return requestNextElement(env.defaultTimeoutMillis(), errorMsg);
445    }
446
447    public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
448      request(1);
449      return nextElement(timeoutMillis, errorMsg);
450    }
451
452    public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException {
453      return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
454    }
455
456    public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException {
457      return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg);
458    }
459
460    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
461      return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
462    }
463
464    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
465      request(1);
466      return nextElementOrEndOfStream(timeoutMillis, errorMsg);
467    }
468
469    public void requestEndOfStream() throws InterruptedException {
470      requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
471    }
472
473    public void requestEndOfStream(long timeoutMillis) throws InterruptedException {
474      requestEndOfStream(timeoutMillis, "Did not receive expected stream completion");
475    }
476
477    public void requestEndOfStream(String errorMsg) throws InterruptedException {
478      requestEndOfStream(env.defaultTimeoutMillis(), errorMsg);
479    }
480
481    public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
482      request(1);
483      expectCompletion(timeoutMillis, errorMsg);
484    }
485
486    public List<T> requestNextElements(long elements) throws InterruptedException {
487      request(elements);
488      return nextElements(elements, env.defaultTimeoutMillis());
489    }
490
491    public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException {
492      request(elements);
493      return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements));
494    }
495
496    public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
497      request(elements);
498      return nextElements(elements, timeoutMillis, errorMsg);
499    }
500
501    public T nextElement() throws InterruptedException {
502      return nextElement(env.defaultTimeoutMillis());
503    }
504
505    public T nextElement(long timeoutMillis) throws InterruptedException {
506      return nextElement(timeoutMillis, "Did not receive expected element");
507    }
508
509    public T nextElement(String errorMsg) throws InterruptedException {
510      return nextElement(env.defaultTimeoutMillis(), errorMsg);
511    }
512
513    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
514      return received.next(timeoutMillis, errorMsg);
515    }
516
517    public Optional<T> nextElementOrEndOfStream() throws InterruptedException {
518      return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
519    }
520
521    public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
522      return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
523    }
524
525    public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
526      return received.nextOrEndOfStream(timeoutMillis, errorMsg);
527    }
528
529    public List<T> nextElements(long elements) throws InterruptedException {
530      return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion");
531    }
532
533    public List<T> nextElements(long elements, String errorMsg) throws InterruptedException {
534      return nextElements(elements, env.defaultTimeoutMillis(), errorMsg);
535    }
536
537    public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException {
538      return nextElements(elements, timeoutMillis, "Did not receive expected element or completion");
539    }
540
541    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
542      return received.nextN(elements, timeoutMillis, errorMsg);
543    }
544
545    public void expectNext(T expected) throws InterruptedException {
546      expectNext(expected, env.defaultTimeoutMillis());
547    }
548
549    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
550      T received = nextElement(timeoutMillis, "Did not receive expected element on downstream");
551      if (!received.equals(expected)) {
552        env.flop(String.format("Expected element %s on downstream but received %s", expected, received));
553      }
554    }
555
556    public void expectCompletion() throws InterruptedException {
557      expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
558    }
559
560    public void expectCompletion(long timeoutMillis) throws InterruptedException {
561      expectCompletion(timeoutMillis, "Did not receive expected stream completion");
562    }
563
564    public void expectCompletion(String errorMsg) throws InterruptedException {
565      expectCompletion(env.defaultTimeoutMillis(), errorMsg);
566    }
567
568    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
569      received.expectCompletion(timeoutMillis, errorMsg);
570    }
571
572    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
573      expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
574    }
575    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception {
576      expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
577    }
578
579    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
580    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception {
581      expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis);
582    }
583
584    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception {
585      expectErrorWithMessage(expected, requiredMessagePartAlternatives, timeoutMillis, timeoutMillis);
586    }
587
588    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives,
589                                                             long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
590      final E err = expectError(expected, totalTimeoutMillis, pollTimeoutMillis);
591      final String message = err.getMessage();
592
593      boolean contains = false;
594      for (String requiredMessagePart : requiredMessagePartAlternatives)
595        if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to
596      assertTrue(contains,
597              String.format("Got expected exception [%s] but missing message part [%s], was: %s",
598                      err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage()));
599    }
600
601    public <E extends Throwable> E expectError(Class<E> expected) throws Exception {
602      return expectError(expected, env.defaultTimeoutMillis());
603    }
604
605    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception {
606      return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis());
607    }
608
609    public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception {
610      return expectError(expected, env.defaultTimeoutMillis(), errorMsg);
611    }
612
613    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception {
614      return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis(), errorMsg);
615    }
616
617    public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
618      return expectError(expected, totalTimeoutMillis, pollTimeoutMillis, String.format("Expected onError(%s)", expected.getName()));
619    }
620
621    public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis,
622                                               String errorMsg) throws Exception {
623      return received.expectError(expected, totalTimeoutMillis, pollTimeoutMillis, errorMsg);
624    }
625
626    public void expectNone() throws InterruptedException {
627      expectNone(env.defaultNoSignalsTimeoutMillis());
628    }
629
630    public void expectNone(String errMsgPrefix) throws InterruptedException {
631      expectNone(env.defaultNoSignalsTimeoutMillis(), errMsgPrefix);
632    }
633
634    public void expectNone(long withinMillis) throws InterruptedException {
635      expectNone(withinMillis, "Did not expect an element but got element");
636    }
637
638    public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException {
639      received.expectNone(withinMillis, errMsgPrefix);
640    }
641
642  }
643
644  public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {
645
646    public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
647      super(env);
648    }
649
650    @Override
651    public void onNext(T element) {
652      if (env.debugEnabled()) {
653        env.debug(String.format("%s::onNext(%s)", this, element));
654      }
655      if (subscription.isCompleted()) {
656        super.onNext(element);
657      } else {
658        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
659      }
660    }
661
662    @Override
663    public void onComplete() {
664      if (env.debugEnabled()) {
665        env.debug(this + "::onComplete()");
666      }
667      if (subscription.isCompleted()) {
668        super.onComplete();
669      } else {
670        env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
671      }
672    }
673
674    @Override
675    public void onSubscribe(Subscription s) {
676      if (env.debugEnabled()) {
677        env.debug(String.format("%s::onSubscribe(%s)", this, s));
678      }
679      if (!subscription.isCompleted()) {
680        subscription.complete(s);
681      } else {
682        env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
683      }
684    }
685
686    @Override
687    public void onError(Throwable cause) {
688      if (env.debugEnabled()) {
689        env.debug(String.format("%s::onError(%s)", this, cause));
690      }
691      if (subscription.isCompleted()) {
692        super.onError(cause);
693      } else {
694        env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));
695      }
696    }
697  }
698
699  /**
700   * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport}
701   * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert
702   * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher.
703   */
704  public static class BlackholeSubscriberWithSubscriptionSupport<T>
705    extends ManualSubscriberWithSubscriptionSupport<T> {
706
707    public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) {
708      super(env);
709    }
710
711    @Override
712    public void onNext(T element) {
713      if (env.debugEnabled()) {
714        env.debug(String.format("%s::onNext(%s)", this, element));
715      }
716      if (!subscription.isCompleted()) {
717        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
718      }
719    }
720
721    @Override
722    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
723      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
724    }
725
726    @Override
727    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
728      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
729    }
730  }
731
732  public static class TestSubscriber<T> implements Subscriber<T> {
733    final Promise<Subscription> subscription;
734
735    protected final TestEnvironment env;
736
737    public TestSubscriber(TestEnvironment env) {
738      this.env = env;
739      subscription = new Promise<Subscription>(env);
740    }
741
742    @Override
743    public void onError(Throwable cause) {
744      env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause));
745    }
746
747    @Override
748    public void onComplete() {
749      env.flop("Unexpected Subscriber::onComplete()");
750    }
751
752    @Override
753    public void onNext(T element) {
754      env.flop(String.format("Unexpected Subscriber::onNext(%s)", element));
755    }
756
757    @Override
758    public void onSubscribe(Subscription subscription) {
759      env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription));
760    }
761
762    public void cancel() {
763      if (subscription.isCompleted()) {
764        subscription.value().cancel();
765      } else {
766        env.flop("Cannot cancel a subscription before having received it");
767      }
768    }
769  }
770
771  public static class ManualPublisher<T> implements Publisher<T> {
772    protected final TestEnvironment env;
773
774    protected long pendingDemand = 0L;
775    protected Promise<Subscriber<? super T>> subscriber;
776
777    protected final Receptacle<Long> requests;
778
779    protected final Latch cancelled;
780
781    public ManualPublisher(TestEnvironment env) {
782      this.env = env;
783      requests = new Receptacle<Long>(env);
784      cancelled = new Latch(env);
785      subscriber = new Promise<Subscriber<? super T>>(this.env);
786    }
787
788    @Override
789    public void subscribe(Subscriber<? super T> s) {
790      if (!subscriber.isCompleted()) {
791        subscriber.completeImmediatly(s);
792
793        Subscription subs = new Subscription() {
794          @Override
795          public void request(long elements) {
796            requests.add(elements);
797          }
798
799          @Override
800          public void cancel() {
801            cancelled.close();
802          }
803        };
804        s.onSubscribe(subs);
805
806      } else {
807        env.flop("TestPublisher doesn't support more than one Subscriber");
808      }
809    }
810
811    public void sendNext(T element) {
812      if (subscriber.isCompleted()) {
813        subscriber.value().onNext(element);
814      } else {
815        env.flop("Cannot sendNext before having a Subscriber");
816      }
817    }
818
819    public void sendCompletion() {
820      if (subscriber.isCompleted()) {
821        subscriber.value().onComplete();
822      } else {
823        env.flop("Cannot sendCompletion before having a Subscriber");
824      }
825    }
826
827    public void sendError(Throwable cause) {
828      if (subscriber.isCompleted()) {
829        subscriber.value().onError(cause);
830      } else {
831        env.flop("Cannot sendError before having a Subscriber");
832      }
833    }
834
835    public long expectRequest() throws InterruptedException {
836      return expectRequest(env.defaultTimeoutMillis());
837    }
838
839    public long expectRequest(long timeoutMillis) throws InterruptedException {
840      long requested = requests.next(timeoutMillis, "Did not receive expected `request` call");
841      if (requested <= 0) {
842        return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
843      } else {
844        pendingDemand += requested;
845        return requested;
846      }
847    }
848
849
850    public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException {
851      long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum));
852      if (requested <= 0) {
853        return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
854      } else {
855        pendingDemand += requested;
856        return requested;
857      }
858    }
859
860    public void expectExactRequest(long expected) throws InterruptedException {
861      expectExactRequest(expected, env.defaultTimeoutMillis());
862    }
863
864    public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException {
865      long requested = expectRequest(timeoutMillis);
866      if (requested != expected) {
867        env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected));
868      }
869      pendingDemand += requested;
870    }
871
872    public void expectNoRequest() throws InterruptedException {
873      expectNoRequest(env.defaultTimeoutMillis());
874    }
875
876    public void expectNoRequest(long timeoutMillis) throws InterruptedException {
877      requests.expectNone(timeoutMillis, "Received an unexpected call to: request: ");
878    }
879
880    public void expectCancelling() throws InterruptedException {
881      expectCancelling(env.defaultTimeoutMillis());
882    }
883
884    public void expectCancelling(long timeoutMillis) throws InterruptedException {
885      cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
886    }
887
888    public boolean isCancelled() throws InterruptedException {
889      return cancelled.isClosed();
890    }
891  }
892
893  /**
894   * Like a CountDownLatch, but resettable and with some convenience methods
895   */
896  public static class Latch {
897    private final TestEnvironment env;
898    volatile private CountDownLatch countDownLatch = new CountDownLatch(1);
899
900    public Latch(TestEnvironment env) {
901      this.env = env;
902    }
903
904    public void reOpen() {
905      countDownLatch = new CountDownLatch(1);
906    }
907
908    public boolean isClosed() {
909      return countDownLatch.getCount() == 0;
910    }
911
912    public void close() {
913      countDownLatch.countDown();
914    }
915
916    public void assertClosed(String openErrorMsg) {
917      if (!isClosed()) {
918        env.flop(new ExpectedClosedLatchException(openErrorMsg));
919      }
920    }
921
922    public void assertOpen(String closedErrorMsg) {
923      if (isClosed()) {
924        env.flop(new ExpectedOpenLatchException(closedErrorMsg));
925      }
926    }
927
928    public void expectClose(String notClosedErrorMsg) throws InterruptedException {
929      expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg);
930    }
931
932    public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException {
933      countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
934      if (countDownLatch.getCount() > 0) {
935        env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis));
936      }
937    }
938
939    static final class ExpectedOpenLatchException extends RuntimeException {
940      public ExpectedOpenLatchException(String message) {
941        super(message);
942      }
943    }
944
945    static final class ExpectedClosedLatchException extends RuntimeException {
946      public ExpectedClosedLatchException(String message) {
947        super(message);
948      }
949    }
950
951  }
952
953  // simple promise for *one* value, which cannot be reset
954  public static class Promise<T> {
955    private final TestEnvironment env;
956
957    public static <T> Promise<T> completed(TestEnvironment env, T value) {
958      Promise<T> promise = new Promise<T>(env);
959      promise.completeImmediatly(value);
960      return promise;
961    }
962
963    public Promise(TestEnvironment env) {
964      this.env = env;
965    }
966
967    private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
968    private AtomicReference<T> _value = new AtomicReference<T>();
969
970    public T value() {
971      final T value = _value.get();
972      if (value != null) {
973        return value;
974      } else {
975        env.flop("Cannot access promise value before completion");
976        return null;
977      }
978    }
979
980    public boolean isCompleted() {
981      return _value.get() != null;
982    }
983
984    /**
985     * Allows using expectCompletion to await for completion of the value and complete it _then_
986     */
987    public void complete(T value) {
988      if (_value.compareAndSet(null, value)) {
989        // we add the value to the queue such to wake up any expectCompletion which was triggered before complete() was called
990        abq.add(value);
991      } else {
992        env.flop(String.format("Cannot complete a promise more than once! Present value: %s, attempted to set: %s", _value.get(), value));
993      }
994    }
995
996    /**
997     * Same as complete.
998     *
999     * Keeping this method for binary compatibility.
1000     */
1001    public void completeImmediatly(T value) {
1002      complete(value);
1003    }
1004
1005    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
1006      if (!isCompleted()) {
1007        T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1008
1009        if (val == null) {
1010          env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
1011        }
1012      }
1013    }
1014  }
1015
1016  // a "Promise" for multiple values, which also supports "end-of-stream reached"
1017  public static class Receptacle<T> {
1018    final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE;
1019    private final TestEnvironment env;
1020
1021    private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE);
1022
1023    private final Latch completedLatch;
1024
1025    Receptacle(TestEnvironment env) {
1026      this.env = env;
1027      this.completedLatch = new Latch(env);
1028    }
1029
1030    public void add(T value) {
1031      completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value));
1032
1033      abq.add(Optional.of(value));
1034    }
1035
1036    public void complete() {
1037      completedLatch.assertOpen("Unexpected additional complete signal received!");
1038      completedLatch.close();
1039
1040      abq.add(Optional.<T>empty());
1041    }
1042
1043    public T next(long timeoutMillis, String errorMsg) throws InterruptedException {
1044      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1045
1046      if (value == null) {
1047        return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
1048      } else if (value.isDefined()) {
1049        return value.get();
1050      } else {
1051        return env.flopAndFail("Expected element but got end-of-stream");
1052      }
1053    }
1054
1055    public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
1056      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1057
1058      if (value == null) {
1059        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
1060        return Optional.empty();
1061      }
1062
1063      return value;
1064    }
1065
1066    /**
1067     * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements
1068     */
1069    public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
1070      List<T> result = new LinkedList<T>();
1071      long remaining = elements;
1072      long deadline = System.currentTimeMillis() + timeoutMillis;
1073      while (remaining > 0) {
1074        long remainingMillis = deadline - System.currentTimeMillis();
1075
1076        result.add(next(remainingMillis, errorMsg));
1077        remaining--;
1078      }
1079
1080      return result;
1081    }
1082
1083
1084    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
1085      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1086
1087      if (value == null) {
1088        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
1089      } else if (value.isDefined()) {
1090        env.flop(String.format("Expected end-of-stream but got element [%s]", value.get()));
1091      } // else, ok
1092    }
1093
1094    /**
1095     * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}.
1096     */
1097    @Deprecated
1098    public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
1099      return expectError(clazz, timeoutMillis, timeoutMillis, errorMsg);
1100    }
1101
1102    @SuppressWarnings("unchecked")
1103    final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeoutMillis,
1104                                              long pollTimeoutMillis,
1105                                              String errorMsg) throws Exception {
1106      long totalTimeoutRemainingNs = MILLISECONDS.toNanos(totalTimeoutMillis);
1107      long timeStampANs = System.nanoTime();
1108      long timeStampBNs;
1109
1110      for (;;) {
1111        Thread.sleep(Math.min(pollTimeoutMillis, NANOSECONDS.toMillis(totalTimeoutRemainingNs)));
1112
1113        if (env.asyncErrors.isEmpty()) {
1114          timeStampBNs = System.nanoTime();
1115          totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
1116          timeStampANs = timeStampBNs;
1117
1118          if (totalTimeoutRemainingNs <= 0) {
1119            return env.flopAndFail(String.format("%s within %d ms", errorMsg, totalTimeoutMillis));
1120          }
1121        } else {
1122          // ok, there was an expected error
1123          Throwable thrown = env.asyncErrors.remove(0);
1124
1125          if (clazz.isInstance(thrown)) {
1126            return (E) thrown;
1127          } else {
1128
1129            return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
1130                    errorMsg, totalTimeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
1131          }
1132        }
1133      }
1134    }
1135
1136    public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
1137      Thread.sleep(withinMillis);
1138      Optional<T> value = abq.poll();
1139
1140      if (value == null) {
1141        // ok
1142      } else if (value.isDefined()) {
1143        env.flop(String.format("%s [%s]", errorMsgPrefix, value.get()));
1144      } else {
1145        env.flop("Expected no element but got end-of-stream");
1146      }
1147    }
1148  }
1149}
1150