001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.flow.support.Optional;
018import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
019
020import java.util.Collections;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.ArrayBlockingQueue;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicReference;
028
029import static java.util.concurrent.TimeUnit.MILLISECONDS;
030import static java.util.concurrent.TimeUnit.NANOSECONDS;
031import static org.testng.Assert.assertTrue;
032import static org.testng.Assert.fail;
033
034public class TestEnvironment {
035  public static final int TEST_BUFFER_SIZE = 16;
036
037  private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS";
038  private static final long DEFAULT_TIMEOUT_MILLIS = 100;
039
040  private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS";
041  private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV";
042
043  private final long defaultTimeoutMillis;
044  private final long defaultPollTimeoutMillis;
045  private final long defaultNoSignalsTimeoutMillis;
046  private final boolean printlnDebug;
047
048  private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>();
049
050  /**
051   * Tests must specify the timeout for expected outcome of asynchronous
052   * interactions. Longer timeout does not invalidate the correctness of
053   * the implementation, but can in some cases result in longer time to
054   * run the tests.
055   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
056   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
057   * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
058    *                                preempted by an asynchronous event.
059   * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
060   */
061  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis,
062                         boolean printlnDebug) {
063    this.defaultTimeoutMillis = defaultTimeoutMillis;
064    this.defaultPollTimeoutMillis = defaultPollTimeoutMillis;
065    this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis;
066    this.printlnDebug = printlnDebug;
067  }
068
069  /**
070   * Tests must specify the timeout for expected outcome of asynchronous
071   * interactions. Longer timeout does not invalidate the correctness of
072   * the implementation, but can in some cases result in longer time to
073   * run the tests.
074   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
075   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
076   * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
077   */
078  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
079    this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug);
080  }
081
082  /**
083   * Tests must specify the timeout for expected outcome of asynchronous
084   * interactions. Longer timeout does not invalidate the correctness of
085   * the implementation, but can in some cases result in longer time to
086   * run the tests.
087   *
088   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
089   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
090   * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
091   *                                 preempted by an asynchronous event.
092   */
093  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) {
094      this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false);
095  }
096
097  /**
098   * Tests must specify the timeout for expected outcome of asynchronous
099   * interactions. Longer timeout does not invalidate the correctness of
100   * the implementation, but can in some cases result in longer time to
101   * run the tests.
102   *
103   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
104   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
105   */
106  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) {
107    this(defaultTimeoutMillis, defaultTimeoutMillis, defaultNoSignalsTimeoutMillis);
108  }
109
110  /**
111   * Tests must specify the timeout for expected outcome of asynchronous
112   * interactions. Longer timeout does not invalidate the correctness of
113   * the implementation, but can in some cases result in longer time to
114   * run the tests.
115   *
116   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
117   */
118  public TestEnvironment(long defaultTimeoutMillis) {
119    this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis);
120  }
121
122  /**
123   * Tests must specify the timeout for expected outcome of asynchronous
124   * interactions. Longer timeout does not invalidate the correctness of
125   * the implementation, but can in some cases result in longer time to
126   * run the tests.
127   *
128   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
129   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
130   *
131   * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
132   *                     often helpful to pinpoint simple race conditions etc.
133   */
134  public TestEnvironment(boolean printlnDebug) {
135    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug);
136  }
137
138  /**
139   * Tests must specify the timeout for expected outcome of asynchronous
140   * interactions. Longer timeout does not invalidate the correctness of
141   * the implementation, but can in some cases result in longer time to
142   * run the tests.
143   *
144   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
145   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
146   */
147  public TestEnvironment() {
148    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis());
149  }
150
151  /** This timeout is used when waiting for a signal to arrive. */
152  public long defaultTimeoutMillis() {
153    return defaultTimeoutMillis;
154  }
155
156  /**
157   * This timeout is used when asserting that no further signals are emitted.
158   * Note that this timeout default
159   */
160  public long defaultNoSignalsTimeoutMillis() {
161    return defaultNoSignalsTimeoutMillis;
162  }
163
164  /**
165   * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous
166   * event.
167   */
168  public long defaultPollTimeoutMillis() {
169    return defaultPollTimeoutMillis;
170  }
171
172  /**
173   * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
174   *
175   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
176   */
177  public static long envDefaultTimeoutMillis() {
178    final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV);
179    if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS;
180    else try {
181      return Long.parseLong(envMillis);
182    } catch (NumberFormatException ex) {
183      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex);
184    }
185  }
186
187  /**
188   * Tries to parse the env variable {@code DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
189   *
190   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
191   */
192  public static long envDefaultNoSignalsTimeoutMillis() {
193    final String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV);
194    if (envMillis == null) return envDefaultTimeoutMillis();
195    else try {
196      return Long.parseLong(envMillis);
197    } catch (NumberFormatException ex) {
198      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex);
199    }
200  }
201
202  /**
203   * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value.
204   *
205   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
206   */
207  public static long envDefaultPollTimeoutMillis() {
208    final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV);
209    if (envMillis == null) return envDefaultTimeoutMillis();
210    else try {
211      return Long.parseLong(envMillis);
212    } catch (NumberFormatException ex) {
213      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex);
214    }
215  }
216
217  /**
218   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
219   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
220   *
221   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
222   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
223   * from the environment using {@code env.dropAsyncError()}.
224   *
225   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
226   */
227  public void flop(String msg) {
228    try {
229      fail(msg);
230    } catch (Throwable t) {
231      asyncErrors.add(t);
232    }
233  }
234
235  /**
236   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
237   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
238   *
239   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
240   *
241   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
242   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
243   * from the environment using {@code env.dropAsyncError()}.
244   *
245   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
246   */
247  public void flop(Throwable thr, String msg) {
248    try {
249      fail(msg, thr);
250    } catch (Throwable t) {
251      asyncErrors.add(thr);
252    }
253  }
254
255  /**
256   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
257   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
258   *
259   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
260   *
261   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
262   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
263   * from the environment using {@code env.dropAsyncError()}.
264   *
265   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
266   */
267  public void flop(Throwable thr) {
268    try {
269      fail(thr.getMessage(), thr);
270    } catch (Throwable t) {
271      asyncErrors.add(thr);
272    }
273  }
274
275  /**
276   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
277   *
278   * This method DOES fail the test right away (it tries to, by throwing an AssertionException),
279   * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error.
280   *
281   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
282   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
283   * from the environment using {@code env.dropAsyncError()}.
284   *
285   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
286   */
287  public <T> T flopAndFail(String msg) {
288    try {
289      fail(msg);
290    } catch (Throwable t) {
291      asyncErrors.add(t);
292      fail(msg, t);
293    }
294    return null; // unreachable, the previous block will always exit by throwing
295  }
296
297
298
299  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
300    subscribe(pub, sub, defaultTimeoutMillis);
301  }
302
303  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
304    pub.subscribe(sub);
305    sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
306    verifyNoAsyncErrorsNoDelay();
307  }
308
309  public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
310    ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this);
311    subscribe(pub, sub, defaultTimeoutMillis());
312    return sub;
313  }
314
315  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
316    return newManualSubscriber(pub, defaultTimeoutMillis());
317  }
318
319  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
320    ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this);
321    subscribe(pub, sub, timeoutMillis);
322    return sub;
323  }
324
325  public void clearAsyncErrors() {
326    asyncErrors.clear();
327  }
328
329  public Throwable dropAsyncError() {
330    try {
331      return asyncErrors.remove(0);
332    } catch (IndexOutOfBoundsException ex) {
333      return null;
334    }
335  }
336
337  /**
338   * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors
339   * were signalled pior to, or during that time (by calling {@code flop()}).
340   */
341  public void verifyNoAsyncErrors() {
342    verifyNoAsyncErrors(defaultNoSignalsTimeoutMillis());
343  }
344
345  /**
346   * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled
347   * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time.
348   * <p></p>
349   * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)},
350   * and if no errors encountered wait for another default timeout as the errors may yet be signalled.
351   * The initial check is performed in order to fail-fast in case of an already failed test.
352   */
353  public void verifyNoAsyncErrors(long delay) {
354    try {
355      verifyNoAsyncErrorsNoDelay();
356
357      Thread.sleep(delay);
358      verifyNoAsyncErrorsNoDelay();
359    } catch (InterruptedException e) {
360      throw new RuntimeException(e);
361    }
362  }
363
364  /**
365   * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}).
366   * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used
367   * for example in tight loops etc.
368   */
369  public void verifyNoAsyncErrorsNoDelay() {
370    for (Throwable e : asyncErrors) {
371      if (e instanceof AssertionError) {
372        throw (AssertionError) e;
373      } else {
374        fail(String.format("Async error during test execution: %s", e.getMessage()), e);
375      }
376    }
377  }
378
379  /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */
380  public void debug(String msg) {
381    if (debugEnabled()) {
382      System.out.printf("[TCK-DEBUG] %s%n", msg);
383    }
384  }
385
386  public final boolean debugEnabled() {
387    return printlnDebug;
388  }
389
390  /**
391   * Looks for given {@code method} method in stack trace.
392   * Can be used to answer questions like "was this method called from onComplete?".
393   *
394   * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise
395   */
396  public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) {
397    final Throwable thr = new Throwable(); // gets the stacktrace
398
399    for (StackTraceElement stackElement : thr.getStackTrace()) {
400      if (stackElement.getMethodName().equals(method)) {
401        return Optional.of(stackElement);
402      }
403    }
404    return Optional.empty();
405  }
406
407  // ---- classes ----
408
409  /**
410   * {@link Subscriber} implementation which can be steered by test code and asserted on.
411   */
412  public static class ManualSubscriber<T> extends TestSubscriber<T> {
413    Receptacle<T> received;
414
415    public ManualSubscriber(TestEnvironment env) {
416      super(env);
417      received = new Receptacle<T>(this.env);
418    }
419
420    @Override
421    public void onNext(T element) {
422      try {
423        received.add(element);
424      } catch (IllegalStateException ex) {
425          // error message refinement
426          throw new SubscriberBufferOverflowException(
427            String.format("Received more than bufferSize (%d) onNext signals. " +
428                            "The Publisher probably emited more signals than expected!",
429                          received.QUEUE_SIZE), ex);
430      }
431    }
432
433    @Override
434    public void onComplete() {
435      received.complete();
436    }
437
438    public void request(long elements) {
439      subscription.value().request(elements);
440    }
441
442    public T requestNextElement() throws InterruptedException {
443      return requestNextElement(env.defaultTimeoutMillis());
444    }
445
446    public T requestNextElement(long timeoutMillis) throws InterruptedException {
447      return requestNextElement(timeoutMillis, "Did not receive expected element");
448    }
449
450    public T requestNextElement(String errorMsg) throws InterruptedException {
451      return requestNextElement(env.defaultTimeoutMillis(), errorMsg);
452    }
453
454    public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
455      request(1);
456      return nextElement(timeoutMillis, errorMsg);
457    }
458
459    public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException {
460      return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
461    }
462
463    public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException {
464      return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg);
465    }
466
467    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
468      return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
469    }
470
471    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
472      request(1);
473      return nextElementOrEndOfStream(timeoutMillis, errorMsg);
474    }
475
476    public void requestEndOfStream() throws InterruptedException {
477      requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
478    }
479
480    public void requestEndOfStream(long timeoutMillis) throws InterruptedException {
481      requestEndOfStream(timeoutMillis, "Did not receive expected stream completion");
482    }
483
484    public void requestEndOfStream(String errorMsg) throws InterruptedException {
485      requestEndOfStream(env.defaultTimeoutMillis(), errorMsg);
486    }
487
488    public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
489      request(1);
490      expectCompletion(timeoutMillis, errorMsg);
491    }
492
493    public List<T> requestNextElements(long elements) throws InterruptedException {
494      request(elements);
495      return nextElements(elements, env.defaultTimeoutMillis());
496    }
497
498    public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException {
499      request(elements);
500      return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements));
501    }
502
503    public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
504      request(elements);
505      return nextElements(elements, timeoutMillis, errorMsg);
506    }
507
508    public T nextElement() throws InterruptedException {
509      return nextElement(env.defaultTimeoutMillis());
510    }
511
512    public T nextElement(long timeoutMillis) throws InterruptedException {
513      return nextElement(timeoutMillis, "Did not receive expected element");
514    }
515
516    public T nextElement(String errorMsg) throws InterruptedException {
517      return nextElement(env.defaultTimeoutMillis(), errorMsg);
518    }
519
520    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
521      return received.next(timeoutMillis, errorMsg);
522    }
523
524    public Optional<T> nextElementOrEndOfStream() throws InterruptedException {
525      return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
526    }
527
528    public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
529      return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
530    }
531
532    public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
533      return received.nextOrEndOfStream(timeoutMillis, errorMsg);
534    }
535
536    public List<T> nextElements(long elements) throws InterruptedException {
537      return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion");
538    }
539
540    public List<T> nextElements(long elements, String errorMsg) throws InterruptedException {
541      return nextElements(elements, env.defaultTimeoutMillis(), errorMsg);
542    }
543
544    public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException {
545      return nextElements(elements, timeoutMillis, "Did not receive expected element or completion");
546    }
547
548    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
549      return received.nextN(elements, timeoutMillis, errorMsg);
550    }
551
552    public void expectNext(T expected) throws InterruptedException {
553      expectNext(expected, env.defaultTimeoutMillis());
554    }
555
556    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
557      T received = nextElement(timeoutMillis, "Did not receive expected element on downstream");
558      if (!received.equals(expected)) {
559        env.flop(String.format("Expected element %s on downstream but received %s", expected, received));
560      }
561    }
562
563    public void expectCompletion() throws InterruptedException {
564      expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
565    }
566
567    public void expectCompletion(long timeoutMillis) throws InterruptedException {
568      expectCompletion(timeoutMillis, "Did not receive expected stream completion");
569    }
570
571    public void expectCompletion(String errorMsg) throws InterruptedException {
572      expectCompletion(env.defaultTimeoutMillis(), errorMsg);
573    }
574
575    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
576      received.expectCompletion(timeoutMillis, errorMsg);
577    }
578
579    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
580      expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
581    }
582    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception {
583      expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis());
584    }
585
586    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
587    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception {
588      expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis);
589    }
590
591    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception {
592      expectErrorWithMessage(expected, requiredMessagePartAlternatives, timeoutMillis, timeoutMillis);
593    }
594
595    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives,
596                                                             long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
597      final E err = expectError(expected, totalTimeoutMillis, pollTimeoutMillis);
598      final String message = err.getMessage();
599
600      boolean contains = false;
601      for (String requiredMessagePart : requiredMessagePartAlternatives)
602        if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to
603      assertTrue(contains,
604              String.format("Got expected exception [%s] but missing message part [%s], was: %s",
605                      err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage()));
606    }
607
608    public <E extends Throwable> E expectError(Class<E> expected) throws Exception {
609      return expectError(expected, env.defaultTimeoutMillis());
610    }
611
612    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception {
613      return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis());
614    }
615
616    public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception {
617      return expectError(expected, env.defaultTimeoutMillis(), errorMsg);
618    }
619
620    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception {
621      return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis(), errorMsg);
622    }
623
624    public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis) throws Exception {
625      return expectError(expected, totalTimeoutMillis, pollTimeoutMillis, String.format("Expected onError(%s)", expected.getName()));
626    }
627
628    public <E extends Throwable> E expectError(Class<E> expected, long totalTimeoutMillis, long pollTimeoutMillis,
629                                               String errorMsg) throws Exception {
630      return received.expectError(expected, totalTimeoutMillis, pollTimeoutMillis, errorMsg);
631    }
632
633    public void expectNone() throws InterruptedException {
634      expectNone(env.defaultNoSignalsTimeoutMillis());
635    }
636
637    public void expectNone(String errMsgPrefix) throws InterruptedException {
638      expectNone(env.defaultNoSignalsTimeoutMillis(), errMsgPrefix);
639    }
640
641    public void expectNone(long withinMillis) throws InterruptedException {
642      expectNone(withinMillis, "Did not expect an element but got element");
643    }
644
645    public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException {
646      received.expectNone(withinMillis, errMsgPrefix);
647    }
648
649  }
650
651  public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {
652
653    public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
654      super(env);
655    }
656
657    @Override
658    public void onNext(T element) {
659      if (env.debugEnabled()) {
660        env.debug(String.format("%s::onNext(%s)", this, element));
661      }
662      if (subscription.isCompleted()) {
663        super.onNext(element);
664      } else {
665        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
666      }
667    }
668
669    @Override
670    public void onComplete() {
671      if (env.debugEnabled()) {
672        env.debug(this + "::onComplete()");
673      }
674      if (subscription.isCompleted()) {
675        super.onComplete();
676      } else {
677        env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
678      }
679    }
680
681    @Override
682    public void onSubscribe(Subscription s) {
683      if (env.debugEnabled()) {
684        env.debug(String.format("%s::onSubscribe(%s)", this, s));
685      }
686      if (!subscription.isCompleted()) {
687        subscription.complete(s);
688      } else {
689        env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
690      }
691    }
692
693    @Override
694    public void onError(Throwable cause) {
695      if (env.debugEnabled()) {
696        env.debug(String.format("%s::onError(%s)", this, cause));
697      }
698      if (subscription.isCompleted()) {
699        super.onError(cause);
700      } else {
701        env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));
702      }
703    }
704  }
705
706  /**
707   * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport}
708   * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert
709   * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher.
710   */
711  public static class BlackholeSubscriberWithSubscriptionSupport<T>
712    extends ManualSubscriberWithSubscriptionSupport<T> {
713
714    public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) {
715      super(env);
716    }
717
718    @Override
719    public void onNext(T element) {
720      if (env.debugEnabled()) {
721        env.debug(String.format("%s::onNext(%s)", this, element));
722      }
723      if (!subscription.isCompleted()) {
724        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
725      }
726    }
727
728    @Override
729    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
730      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
731    }
732
733    @Override
734    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
735      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
736    }
737  }
738
739  public static class TestSubscriber<T> implements Subscriber<T> {
740    final Promise<Subscription> subscription;
741
742    protected final TestEnvironment env;
743
744    public TestSubscriber(TestEnvironment env) {
745      this.env = env;
746      subscription = new Promise<Subscription>(env);
747    }
748
749    @Override
750    public void onError(Throwable cause) {
751      env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause));
752    }
753
754    @Override
755    public void onComplete() {
756      env.flop("Unexpected Subscriber::onComplete()");
757    }
758
759    @Override
760    public void onNext(T element) {
761      env.flop(String.format("Unexpected Subscriber::onNext(%s)", element));
762    }
763
764    @Override
765    public void onSubscribe(Subscription subscription) {
766      env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription));
767    }
768
769    public void cancel() {
770      if (subscription.isCompleted()) {
771        subscription.value().cancel();
772      } else {
773        env.flop("Cannot cancel a subscription before having received it");
774      }
775    }
776  }
777
778  public static class ManualPublisher<T> implements Publisher<T> {
779    protected final TestEnvironment env;
780
781    protected long pendingDemand = 0L;
782    protected Promise<Subscriber<? super T>> subscriber;
783
784    protected final Receptacle<Long> requests;
785
786    protected final Latch cancelled;
787
788    public ManualPublisher(TestEnvironment env) {
789      this.env = env;
790      requests = new Receptacle<Long>(env);
791      cancelled = new Latch(env);
792      subscriber = new Promise<Subscriber<? super T>>(this.env);
793    }
794
795    @Override
796    public void subscribe(Subscriber<? super T> s) {
797      if (!subscriber.isCompleted()) {
798        subscriber.completeImmediatly(s);
799
800        Subscription subs = new Subscription() {
801          @Override
802          public void request(long elements) {
803            requests.add(elements);
804          }
805
806          @Override
807          public void cancel() {
808            cancelled.close();
809          }
810        };
811        s.onSubscribe(subs);
812
813      } else {
814        env.flop("TestPublisher doesn't support more than one Subscriber");
815      }
816    }
817
818    public void sendNext(T element) {
819      if (subscriber.isCompleted()) {
820        subscriber.value().onNext(element);
821      } else {
822        env.flop("Cannot sendNext before having a Subscriber");
823      }
824    }
825
826    public void sendCompletion() {
827      if (subscriber.isCompleted()) {
828        subscriber.value().onComplete();
829      } else {
830        env.flop("Cannot sendCompletion before having a Subscriber");
831      }
832    }
833
834    public void sendError(Throwable cause) {
835      if (subscriber.isCompleted()) {
836        subscriber.value().onError(cause);
837      } else {
838        env.flop("Cannot sendError before having a Subscriber");
839      }
840    }
841
842    public long expectRequest() throws InterruptedException {
843      return expectRequest(env.defaultTimeoutMillis());
844    }
845
846    public long expectRequest(long timeoutMillis) throws InterruptedException {
847      long requested = requests.next(timeoutMillis, "Did not receive expected `request` call");
848      if (requested <= 0) {
849        return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
850      } else {
851        pendingDemand += requested;
852        return requested;
853      }
854    }
855
856
857    public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException {
858      long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum));
859      if (requested <= 0) {
860        return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
861      } else {
862        pendingDemand += requested;
863        return requested;
864      }
865    }
866
867    public void expectExactRequest(long expected) throws InterruptedException {
868      expectExactRequest(expected, env.defaultTimeoutMillis());
869    }
870
871    public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException {
872      long requested = expectRequest(timeoutMillis);
873      if (requested != expected) {
874        env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected));
875      }
876      pendingDemand += requested;
877    }
878
879    public void expectNoRequest() throws InterruptedException {
880      expectNoRequest(env.defaultTimeoutMillis());
881    }
882
883    public void expectNoRequest(long timeoutMillis) throws InterruptedException {
884      requests.expectNone(timeoutMillis, "Received an unexpected call to: request: ");
885    }
886
887    public void expectCancelling() throws InterruptedException {
888      expectCancelling(env.defaultTimeoutMillis());
889    }
890
891    public void expectCancelling(long timeoutMillis) throws InterruptedException {
892      cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
893    }
894
895    public boolean isCancelled() throws InterruptedException {
896      return cancelled.isClosed();
897    }
898  }
899
900  /**
901   * Like a CountDownLatch, but resettable and with some convenience methods
902   */
903  public static class Latch {
904    private final TestEnvironment env;
905    volatile private CountDownLatch countDownLatch = new CountDownLatch(1);
906
907    public Latch(TestEnvironment env) {
908      this.env = env;
909    }
910
911    public void reOpen() {
912      countDownLatch = new CountDownLatch(1);
913    }
914
915    public boolean isClosed() {
916      return countDownLatch.getCount() == 0;
917    }
918
919    public void close() {
920      countDownLatch.countDown();
921    }
922
923    public void assertClosed(String openErrorMsg) {
924      if (!isClosed()) {
925        env.flop(new ExpectedClosedLatchException(openErrorMsg));
926      }
927    }
928
929    public void assertOpen(String closedErrorMsg) {
930      if (isClosed()) {
931        env.flop(new ExpectedOpenLatchException(closedErrorMsg));
932      }
933    }
934
935    public void expectClose(String notClosedErrorMsg) throws InterruptedException {
936      expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg);
937    }
938
939    public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException {
940      countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
941      if (countDownLatch.getCount() > 0) {
942        env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis));
943      }
944    }
945
946    static final class ExpectedOpenLatchException extends RuntimeException {
947      public ExpectedOpenLatchException(String message) {
948        super(message);
949      }
950    }
951
952    static final class ExpectedClosedLatchException extends RuntimeException {
953      public ExpectedClosedLatchException(String message) {
954        super(message);
955      }
956    }
957
958  }
959
960  // simple promise for *one* value, which cannot be reset
961  public static class Promise<T> {
962    private final TestEnvironment env;
963
964    public static <T> Promise<T> completed(TestEnvironment env, T value) {
965      Promise<T> promise = new Promise<T>(env);
966      promise.completeImmediatly(value);
967      return promise;
968    }
969
970    public Promise(TestEnvironment env) {
971      this.env = env;
972    }
973
974    private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
975    private AtomicReference<T> _value = new AtomicReference<T>();
976
977    public T value() {
978      final T value = _value.get();
979      if (value != null) {
980        return value;
981      } else {
982        env.flop("Cannot access promise value before completion");
983        return null;
984      }
985    }
986
987    public boolean isCompleted() {
988      return _value.get() != null;
989    }
990
991    /**
992     * Allows using expectCompletion to await for completion of the value and complete it _then_
993     */
994    public void complete(T value) {
995      if (_value.compareAndSet(null, value)) {
996        // we add the value to the queue such to wake up any expectCompletion which was triggered before complete() was called
997        abq.add(value);
998      } else {
999        env.flop(String.format("Cannot complete a promise more than once! Present value: %s, attempted to set: %s", _value.get(), value));
1000      }
1001    }
1002
1003    /**
1004     * Same as complete.
1005     *
1006     * Keeping this method for binary compatibility.
1007     */
1008    public void completeImmediatly(T value) {
1009      complete(value);
1010    }
1011
1012    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
1013      if (!isCompleted()) {
1014        T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1015
1016        if (val == null) {
1017          env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
1018        }
1019      }
1020    }
1021  }
1022
1023  // a "Promise" for multiple values, which also supports "end-of-stream reached"
1024  public static class Receptacle<T> {
1025    final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE;
1026    private final TestEnvironment env;
1027
1028    private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE);
1029
1030    private final Latch completedLatch;
1031
1032    Receptacle(TestEnvironment env) {
1033      this.env = env;
1034      this.completedLatch = new Latch(env);
1035    }
1036
1037    public void add(T value) {
1038      completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value));
1039
1040      abq.add(Optional.of(value));
1041    }
1042
1043    public void complete() {
1044      completedLatch.assertOpen("Unexpected additional complete signal received!");
1045      completedLatch.close();
1046
1047      abq.add(Optional.<T>empty());
1048    }
1049
1050    public T next(long timeoutMillis, String errorMsg) throws InterruptedException {
1051      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1052
1053      if (value == null) {
1054        return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
1055      } else if (value.isDefined()) {
1056        return value.get();
1057      } else {
1058        return env.flopAndFail("Expected element but got end-of-stream");
1059      }
1060    }
1061
1062    public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
1063      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1064
1065      if (value == null) {
1066        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
1067        return Optional.empty();
1068      }
1069
1070      return value;
1071    }
1072
1073    /**
1074     * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements
1075     */
1076    public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
1077      List<T> result = new LinkedList<T>();
1078      long remaining = elements;
1079      long deadline = System.currentTimeMillis() + timeoutMillis;
1080      while (remaining > 0) {
1081        long remainingMillis = deadline - System.currentTimeMillis();
1082
1083        result.add(next(remainingMillis, errorMsg));
1084        remaining--;
1085      }
1086
1087      return result;
1088    }
1089
1090
1091    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
1092      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
1093
1094      if (value == null) {
1095        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
1096      } else if (value.isDefined()) {
1097        env.flop(String.format("Expected end-of-stream but got element [%s]", value.get()));
1098      } // else, ok
1099    }
1100
1101    /**
1102     * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}.
1103     */
1104    @Deprecated
1105    public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
1106      return expectError(clazz, timeoutMillis, timeoutMillis, errorMsg);
1107    }
1108
1109    @SuppressWarnings("unchecked")
1110    final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeoutMillis,
1111                                              long pollTimeoutMillis,
1112                                              String errorMsg) throws Exception {
1113      long totalTimeoutRemainingNs = MILLISECONDS.toNanos(totalTimeoutMillis);
1114      long timeStampANs = System.nanoTime();
1115      long timeStampBNs;
1116
1117      for (;;) {
1118        Thread.sleep(Math.min(pollTimeoutMillis, NANOSECONDS.toMillis(totalTimeoutRemainingNs)));
1119
1120        if (env.asyncErrors.isEmpty()) {
1121          timeStampBNs = System.nanoTime();
1122          totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
1123          timeStampANs = timeStampBNs;
1124
1125          if (totalTimeoutRemainingNs <= 0) {
1126            return env.flopAndFail(String.format("%s within %d ms", errorMsg, totalTimeoutMillis));
1127          }
1128        } else {
1129          // ok, there was an expected error
1130          Throwable thrown = env.asyncErrors.remove(0);
1131
1132          if (clazz.isInstance(thrown)) {
1133            return (E) thrown;
1134          } else {
1135
1136            return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
1137                    errorMsg, totalTimeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
1138          }
1139        }
1140      }
1141    }
1142
1143    public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
1144      Thread.sleep(withinMillis);
1145      Optional<T> value = abq.poll();
1146
1147      if (value == null) {
1148        // ok
1149      } else if (value.isDefined()) {
1150        env.flop(String.format("%s [%s]", errorMsgPrefix, value.get()));
1151      } else {
1152        env.flop("Expected no element but got end-of-stream");
1153      }
1154    }
1155  }
1156}
1157