001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.Subscriber;
016import org.reactivestreams.Subscription;
017import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
018import org.reactivestreams.tck.flow.support.Optional;
019
020import java.util.Collections;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.ArrayBlockingQueue;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.TimeUnit;
027
028import static org.testng.Assert.assertTrue;
029import static org.testng.Assert.fail;
030
031public class TestEnvironment {
032  public static final int TEST_BUFFER_SIZE = 16;
033
034  private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS";
035  private static final long DEFAULT_TIMEOUT_MILLIS = 100;
036
037  private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS";
038
039  private final long defaultTimeoutMillis;
040  private final long defaultNoSignalsTimeoutMillis;
041  private final boolean printlnDebug;
042
043  private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>();
044
045  /**
046   * Tests must specify the timeout for expected outcome of asynchronous
047   * interactions. Longer timeout does not invalidate the correctness of
048   * the implementation, but can in some cases result in longer time to
049   * run the tests.
050   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
051   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
052   * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
053   */
054  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
055    this.defaultTimeoutMillis = defaultTimeoutMillis;
056    this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis;
057    this.printlnDebug = printlnDebug;
058  }
059
060  /**
061   * Tests must specify the timeout for expected outcome of asynchronous
062   * interactions. Longer timeout does not invalidate the correctness of
063   * the implementation, but can in some cases result in longer time to
064   * run the tests.
065   *
066   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
067   * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
068   */
069  public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) {
070    this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false);
071  }
072
073  /**
074   * Tests must specify the timeout for expected outcome of asynchronous
075   * interactions. Longer timeout does not invalidate the correctness of
076   * the implementation, but can in some cases result in longer time to
077   * run the tests.
078   *
079   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
080   */
081  public TestEnvironment(long defaultTimeoutMillis) {
082    this(defaultTimeoutMillis, defaultTimeoutMillis, false);
083  }
084
085  /**
086   * Tests must specify the timeout for expected outcome of asynchronous
087   * interactions. Longer timeout does not invalidate the correctness of
088   * the implementation, but can in some cases result in longer time to
089   * run the tests.
090   *
091   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
092   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
093   *
094   * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
095   *                     often helpful to pinpoint simple race conditions etc.
096   */
097  public TestEnvironment(boolean printlnDebug) {
098    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), printlnDebug);
099  }
100
101  /**
102   * Tests must specify the timeout for expected outcome of asynchronous
103   * interactions. Longer timeout does not invalidate the correctness of
104   * the implementation, but can in some cases result in longer time to
105   * run the tests.
106   *
107   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
108   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
109   */
110  public TestEnvironment() {
111    this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis());
112  }
113
114  /** This timeout is used when waiting for a signal to arrive. */
115  public long defaultTimeoutMillis() {
116    return defaultTimeoutMillis;
117  }
118
119  /**
120   * This timeout is used when asserting that no further signals are emitted.
121   * Note that this timeout default
122   */
123  public long defaultNoSignalsTimeoutMillis() {
124    return defaultNoSignalsTimeoutMillis;
125  }
126
127  /**
128   * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
129   *
130   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
131   */
132  public static long envDefaultTimeoutMillis() {
133    final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV);
134    if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS;
135    else try {
136      return Long.parseLong(envMillis);
137    } catch (NumberFormatException ex) {
138      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex);
139    }
140  }
141
142  /**
143   * Tries to parse the env variable {@code DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
144   *
145   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
146   */
147  public static long envDefaultNoSignalsTimeoutMillis() {
148    final String envMillis = System.getenv(DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV);
149    if (envMillis == null) return envDefaultTimeoutMillis();
150    else try {
151      return Long.parseLong(envMillis);
152    } catch (NumberFormatException ex) {
153      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV, envMillis), ex);
154    }
155  }
156
157  /**
158   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
159   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
160   *
161   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
162   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
163   * from the environment using {@code env.dropAsyncError()}.
164   *
165   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
166   */
167  public void flop(String msg) {
168    try {
169      fail(msg);
170    } catch (Throwable t) {
171      asyncErrors.add(t);
172    }
173  }
174
175  /**
176   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
177   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
178   *
179   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
180   *
181   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
182   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
183   * from the environment using {@code env.dropAsyncError()}.
184   *
185   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
186   */
187  public void flop(Throwable thr, String msg) {
188    try {
189      fail(msg, thr);
190    } catch (Throwable t) {
191      asyncErrors.add(thr);
192    }
193  }
194  
195  /**
196   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
197   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
198   *
199   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
200   *
201   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
202   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
203   * from the environment using {@code env.dropAsyncError()}.
204   *
205   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
206   */
207  public void flop(Throwable thr) {
208    try {
209      fail(thr.getMessage(), thr);
210    } catch (Throwable t) {
211      asyncErrors.add(thr);
212    }
213  }
214
215  /**
216   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
217   *
218   * This method DOES fail the test right away (it tries to, by throwing an AssertionException),
219   * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error.
220   *
221   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
222   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
223   * from the environment using {@code env.dropAsyncError()}.
224   *
225   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
226   */
227  public <T> T flopAndFail(String msg) {
228    try {
229      fail(msg);
230    } catch (Throwable t) {
231      asyncErrors.add(t);
232      fail(msg, t);
233    }
234    return null; // unreachable, the previous block will always exit by throwing
235  }
236
237
238
239  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
240    subscribe(pub, sub, defaultTimeoutMillis);
241  }
242
243  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
244    pub.subscribe(sub);
245    sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
246    verifyNoAsyncErrorsNoDelay();
247  }
248
249  public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
250    ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this);
251    subscribe(pub, sub, defaultTimeoutMillis());
252    return sub;
253  }
254
255  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
256    return newManualSubscriber(pub, defaultTimeoutMillis());
257  }
258
259  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
260    ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this);
261    subscribe(pub, sub, timeoutMillis);
262    return sub;
263  }
264
265  public void clearAsyncErrors() {
266    asyncErrors.clear();
267  }
268
269  public Throwable dropAsyncError() {
270    try {
271      return asyncErrors.remove(0);
272    } catch (IndexOutOfBoundsException ex) {
273      return null;
274    }
275  }
276
277  /**
278   * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors
279   * were signalled pior to, or during that time (by calling {@code flop()}).
280   */
281  public void verifyNoAsyncErrors() {
282    verifyNoAsyncErrors(defaultNoSignalsTimeoutMillis());
283  }
284
285  /**
286   * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled
287   * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time.
288   * <p></p>
289   * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)},
290   * and if no errors encountered wait for another default timeout as the errors may yet be signalled.
291   * The initial check is performed in order to fail-fast in case of an already failed test.
292   */
293  public void verifyNoAsyncErrors(long delay) {
294    try {
295      verifyNoAsyncErrorsNoDelay();
296
297      Thread.sleep(delay);
298      verifyNoAsyncErrorsNoDelay();
299    } catch (InterruptedException e) {
300      throw new RuntimeException(e);
301    }
302  }
303
304  /**
305   * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}).
306   * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used
307   * for example in tight loops etc.
308   */
309  public void verifyNoAsyncErrorsNoDelay() {
310    for (Throwable e : asyncErrors) {
311      if (e instanceof AssertionError) {
312        throw (AssertionError) e;
313      } else {
314        fail(String.format("Async error during test execution: %s", e.getMessage()), e);
315      }
316    }
317  }
318
319  /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */
320  public void debug(String msg) {
321    if (printlnDebug)
322      System.out.printf("[TCK-DEBUG] %s%n", msg);
323  }
324
325  /**
326   * Looks for given {@code method} method in stack trace.
327   * Can be used to answer questions like "was this method called from onComplete?".
328   *
329   * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise
330   */
331  public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) {
332    final Throwable thr = new Throwable(); // gets the stacktrace
333
334    for (StackTraceElement stackElement : thr.getStackTrace()) {
335      if (stackElement.getMethodName().equals(method)) {
336        return Optional.of(stackElement);
337      }
338    }
339    return Optional.empty();
340  }
341
342  // ---- classes ----
343
344  /**
345   * {@link Subscriber} implementation which can be steered by test code and asserted on.
346   */
347  public static class ManualSubscriber<T> extends TestSubscriber<T> {
348    Receptacle<T> received;
349
350    public ManualSubscriber(TestEnvironment env) {
351      super(env);
352      received = new Receptacle<T>(this.env);
353    }
354
355    @Override
356    public void onNext(T element) {
357      try {
358        received.add(element);
359      } catch (IllegalStateException ex) {
360          // error message refinement
361          throw new SubscriberBufferOverflowException(
362            String.format("Received more than bufferSize (%d) onNext signals. " +
363                            "The Publisher probably emited more signals than expected!",
364                          received.QUEUE_SIZE), ex);
365      }
366    }
367
368    @Override
369    public void onComplete() {
370      received.complete();
371    }
372
373    public void request(long elements) {
374      subscription.value().request(elements);
375    }
376
377    public T requestNextElement() throws InterruptedException {
378      return requestNextElement(env.defaultTimeoutMillis());
379    }
380
381    public T requestNextElement(long timeoutMillis) throws InterruptedException {
382      return requestNextElement(timeoutMillis, "Did not receive expected element");
383    }
384
385    public T requestNextElement(String errorMsg) throws InterruptedException {
386      return requestNextElement(env.defaultTimeoutMillis(), errorMsg);
387    }
388
389    public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
390      request(1);
391      return nextElement(timeoutMillis, errorMsg);
392    }
393
394    public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException {
395      return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
396    }
397
398    public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException {
399      return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg);
400    }
401
402    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
403      return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
404    }
405
406    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
407      request(1);
408      return nextElementOrEndOfStream(timeoutMillis, errorMsg);
409    }
410
411    public void requestEndOfStream() throws InterruptedException {
412      requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
413    }
414
415    public void requestEndOfStream(long timeoutMillis) throws InterruptedException {
416      requestEndOfStream(timeoutMillis, "Did not receive expected stream completion");
417    }
418
419    public void requestEndOfStream(String errorMsg) throws InterruptedException {
420      requestEndOfStream(env.defaultTimeoutMillis(), errorMsg);
421    }
422
423    public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
424      request(1);
425      expectCompletion(timeoutMillis, errorMsg);
426    }
427
428    public List<T> requestNextElements(long elements) throws InterruptedException {
429      request(elements);
430      return nextElements(elements, env.defaultTimeoutMillis());
431    }
432
433    public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException {
434      request(elements);
435      return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements));
436    }
437
438    public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
439      request(elements);
440      return nextElements(elements, timeoutMillis, errorMsg);
441    }
442
443    public T nextElement() throws InterruptedException {
444      return nextElement(env.defaultTimeoutMillis());
445    }
446
447    public T nextElement(long timeoutMillis) throws InterruptedException {
448      return nextElement(timeoutMillis, "Did not receive expected element");
449    }
450
451    public T nextElement(String errorMsg) throws InterruptedException {
452      return nextElement(env.defaultTimeoutMillis(), errorMsg);
453    }
454
455    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
456      return received.next(timeoutMillis, errorMsg);
457    }
458
459    public Optional<T> nextElementOrEndOfStream() throws InterruptedException {
460      return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
461    }
462
463    public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
464      return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
465    }
466
467    public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
468      return received.nextOrEndOfStream(timeoutMillis, errorMsg);
469    }
470
471    public List<T> nextElements(long elements) throws InterruptedException {
472      return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion");
473    }
474
475    public List<T> nextElements(long elements, String errorMsg) throws InterruptedException {
476      return nextElements(elements, env.defaultTimeoutMillis(), errorMsg);
477    }
478
479    public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException {
480      return nextElements(elements, timeoutMillis, "Did not receive expected element or completion");
481    }
482
483    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
484      return received.nextN(elements, timeoutMillis, errorMsg);
485    }
486
487    public void expectNext(T expected) throws InterruptedException {
488      expectNext(expected, env.defaultTimeoutMillis());
489    }
490
491    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
492      T received = nextElement(timeoutMillis, "Did not receive expected element on downstream");
493      if (!received.equals(expected)) {
494        env.flop(String.format("Expected element %s on downstream but received %s", expected, received));
495      }
496    }
497
498    public void expectCompletion() throws InterruptedException {
499      expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
500    }
501
502    public void expectCompletion(long timeoutMillis) throws InterruptedException {
503      expectCompletion(timeoutMillis, "Did not receive expected stream completion");
504    }
505
506    public void expectCompletion(String errorMsg) throws InterruptedException {
507      expectCompletion(env.defaultTimeoutMillis(), errorMsg);
508    }
509
510    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
511      received.expectCompletion(timeoutMillis, errorMsg);
512    }
513
514    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
515      expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis());
516    }
517    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives) throws Exception {
518      expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis());
519    }
520
521    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
522    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception {
523      expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis);
524    }
525    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, List<String> requiredMessagePartAlternatives, long timeoutMillis) throws Exception {
526      final E err = expectError(expected, timeoutMillis);
527      final String message = err.getMessage();
528      
529      boolean contains = false;
530      for (String requiredMessagePart : requiredMessagePartAlternatives) 
531        if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to
532      assertTrue(contains,
533                 String.format("Got expected exception [%s] but missing message part [%s], was: %s",
534                               err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage()));
535    }
536
537    public <E extends Throwable> E expectError(Class<E> expected) throws Exception {
538      return expectError(expected, env.defaultTimeoutMillis());
539    }
540
541    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception {
542      return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName()));
543    }
544
545    public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception {
546      return expectError(expected, env.defaultTimeoutMillis(), errorMsg);
547    }
548
549    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception {
550      return received.expectError(expected, timeoutMillis, errorMsg);
551    }
552
553    public void expectNone() throws InterruptedException {
554      expectNone(env.defaultNoSignalsTimeoutMillis());
555    }
556
557    public void expectNone(String errMsgPrefix) throws InterruptedException {
558      expectNone(env.defaultNoSignalsTimeoutMillis(), errMsgPrefix);
559    }
560
561    public void expectNone(long withinMillis) throws InterruptedException {
562      expectNone(withinMillis, "Did not expect an element but got element");
563    }
564
565    public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException {
566      received.expectNone(withinMillis, errMsgPrefix);
567    }
568
569  }
570
571  public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {
572
573    public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
574      super(env);
575    }
576
577    @Override
578    public void onNext(T element) {
579      env.debug(String.format("%s::onNext(%s)", this, element));
580      if (subscription.isCompleted()) {
581        super.onNext(element);
582      } else {
583        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
584      }
585    }
586
587    @Override
588    public void onComplete() {
589      env.debug(this + "::onComplete()");
590      if (subscription.isCompleted()) {
591        super.onComplete();
592      } else {
593        env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
594      }
595    }
596
597    @Override
598    public void onSubscribe(Subscription s) {
599      env.debug(String.format("%s::onSubscribe(%s)", this, s));
600      if (!subscription.isCompleted()) {
601        subscription.complete(s);
602      } else {
603        env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
604      }
605    }
606
607    @Override
608    public void onError(Throwable cause) {
609      env.debug(String.format("%s::onError(%s)", this, cause));
610      if (subscription.isCompleted()) {
611        super.onError(cause);
612      } else {
613        env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));
614      }
615    }
616  }
617
618  /**
619   * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport}
620   * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert
621   * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher.
622   */
623  public static class BlackholeSubscriberWithSubscriptionSupport<T>
624    extends ManualSubscriberWithSubscriptionSupport<T> {
625
626    public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) {
627      super(env);
628    }
629
630    @Override
631    public void onNext(T element) {
632      env.debug(String.format("%s::onNext(%s)", this, element));
633      if (!subscription.isCompleted()) {
634        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
635      }
636    }
637
638    @Override
639    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
640      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
641    }
642
643    @Override
644    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
645      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
646    }
647  }
648
649  public static class TestSubscriber<T> implements Subscriber<T> {
650    final Promise<Subscription> subscription;
651
652    protected final TestEnvironment env;
653
654    public TestSubscriber(TestEnvironment env) {
655      this.env = env;
656      subscription = new Promise<Subscription>(env);
657    }
658
659    @Override
660    public void onError(Throwable cause) {
661      env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause));
662    }
663
664    @Override
665    public void onComplete() {
666      env.flop("Unexpected Subscriber::onComplete()");
667    }
668
669    @Override
670    public void onNext(T element) {
671      env.flop(String.format("Unexpected Subscriber::onNext(%s)", element));
672    }
673
674    @Override
675    public void onSubscribe(Subscription subscription) {
676      env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription));
677    }
678
679    public void cancel() {
680      if (subscription.isCompleted()) {
681        subscription.value().cancel();
682      } else {
683        env.flop("Cannot cancel a subscription before having received it");
684      }
685    }
686  }
687
688  public static class ManualPublisher<T> implements Publisher<T> {
689    protected final TestEnvironment env;
690
691    protected long pendingDemand = 0L;
692    protected Promise<Subscriber<? super T>> subscriber;
693
694    protected final Receptacle<Long> requests;
695
696    protected final Latch cancelled;
697
698    public ManualPublisher(TestEnvironment env) {
699      this.env = env;
700      requests = new Receptacle<Long>(env);
701      cancelled = new Latch(env);
702      subscriber = new Promise<Subscriber<? super T>>(this.env);
703    }
704
705    @Override
706    public void subscribe(Subscriber<? super T> s) {
707      if (!subscriber.isCompleted()) {
708        subscriber.completeImmediatly(s);
709
710        Subscription subs = new Subscription() {
711          @Override
712          public void request(long elements) {
713            requests.add(elements);
714          }
715
716          @Override
717          public void cancel() {
718            cancelled.close();
719          }
720        };
721        s.onSubscribe(subs);
722
723      } else {
724        env.flop("TestPublisher doesn't support more than one Subscriber");
725      }
726    }
727
728    public void sendNext(T element) {
729      if (subscriber.isCompleted()) {
730        subscriber.value().onNext(element);
731      } else {
732        env.flop("Cannot sendNext before having a Subscriber");
733      }
734    }
735
736    public void sendCompletion() {
737      if (subscriber.isCompleted()) {
738        subscriber.value().onComplete();
739      } else {
740        env.flop("Cannot sendCompletion before having a Subscriber");
741      }
742    }
743
744    public void sendError(Throwable cause) {
745      if (subscriber.isCompleted()) {
746        subscriber.value().onError(cause);
747      } else {
748        env.flop("Cannot sendError before having a Subscriber");
749      }
750    }
751
752    public long expectRequest() throws InterruptedException {
753      return expectRequest(env.defaultTimeoutMillis());
754    }
755
756    public long expectRequest(long timeoutMillis) throws InterruptedException {
757      long requested = requests.next(timeoutMillis, "Did not receive expected `request` call");
758      if (requested <= 0) {
759        return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
760      } else {
761        pendingDemand += requested;
762        return requested;
763      }
764    }
765
766
767    public long expectRequest(long timeoutMillis, String errorMessageAddendum) throws InterruptedException {
768      long requested = requests.next(timeoutMillis, String.format("Did not receive expected `request` call. %s", errorMessageAddendum));
769      if (requested <= 0) {
770        return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
771      } else {
772        pendingDemand += requested;
773        return requested;
774      }
775    }
776
777    public void expectExactRequest(long expected) throws InterruptedException {
778      expectExactRequest(expected, env.defaultTimeoutMillis());
779    }
780
781    public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException {
782      long requested = expectRequest(timeoutMillis);
783      if (requested != expected) {
784        env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected));
785      }
786      pendingDemand += requested;
787    }
788
789    public void expectNoRequest() throws InterruptedException {
790      expectNoRequest(env.defaultTimeoutMillis());
791    }
792
793    public void expectNoRequest(long timeoutMillis) throws InterruptedException {
794      requests.expectNone(timeoutMillis, "Received an unexpected call to: request: ");
795    }
796
797    public void expectCancelling() throws InterruptedException {
798      expectCancelling(env.defaultTimeoutMillis());
799    }
800
801    public void expectCancelling(long timeoutMillis) throws InterruptedException {
802      cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
803    }
804    
805    public boolean isCancelled() throws InterruptedException {
806      return cancelled.isClosed();
807    }
808  }
809
810  /**
811   * Like a CountDownLatch, but resettable and with some convenience methods
812   */
813  public static class Latch {
814    private final TestEnvironment env;
815    volatile private CountDownLatch countDownLatch = new CountDownLatch(1);
816
817    public Latch(TestEnvironment env) {
818      this.env = env;
819    }
820
821    public void reOpen() {
822      countDownLatch = new CountDownLatch(1);
823    }
824
825    public boolean isClosed() {
826      return countDownLatch.getCount() == 0;
827    }
828
829    public void close() {
830      countDownLatch.countDown();
831    }
832
833    public void assertClosed(String openErrorMsg) {
834      if (!isClosed()) {
835        env.flop(new ExpectedClosedLatchException(openErrorMsg));
836      }
837    }
838
839    public void assertOpen(String closedErrorMsg) {
840      if (isClosed()) {
841        env.flop(new ExpectedOpenLatchException(closedErrorMsg));
842      }
843    }
844
845    public void expectClose(String notClosedErrorMsg) throws InterruptedException {
846      expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg);
847    }
848
849    public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException {
850      countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
851      if (countDownLatch.getCount() > 0) {
852        env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis));
853      }
854    }
855
856    static final class ExpectedOpenLatchException extends RuntimeException {
857      public ExpectedOpenLatchException(String message) {
858        super(message);
859      }
860    }
861
862    static final class ExpectedClosedLatchException extends RuntimeException {
863      public ExpectedClosedLatchException(String message) {
864        super(message);
865      }
866    }
867
868  }
869
870  // simple promise for *one* value, which cannot be reset
871  public static class Promise<T> {
872    private final TestEnvironment env;
873
874    public static <T> Promise<T> completed(TestEnvironment env, T value) {
875      Promise<T> promise = new Promise<T>(env);
876      promise.completeImmediatly(value);
877      return promise;
878    }
879
880    public Promise(TestEnvironment env) {
881      this.env = env;
882    }
883
884    private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
885    private volatile T _value = null;
886
887    public T value() {
888      if (isCompleted()) {
889        return _value;
890      } else {
891        env.flop("Cannot access promise value before completion");
892        return null;
893      }
894    }
895
896    public boolean isCompleted() {
897      return _value != null;
898    }
899
900    /**
901     * Allows using expectCompletion to await for completion of the value and complete it _then_
902     */
903    public void complete(T value) {
904      abq.add(value);
905    }
906
907    /**
908     * Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way
909     */
910    public void completeImmediatly(T value) {
911      complete(value); // complete!
912      _value = value;  // immediatly!
913    }
914
915    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
916      if (!isCompleted()) {
917        T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
918
919        if (val == null) {
920          env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
921        } else {
922          _value = val;
923        }
924      }
925    }
926  }
927
928  // a "Promise" for multiple values, which also supports "end-of-stream reached"
929  public static class Receptacle<T> {
930    final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE;
931    private final TestEnvironment env;
932
933    private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE);
934
935    private final Latch completedLatch;
936
937    Receptacle(TestEnvironment env) {
938      this.env = env;
939      this.completedLatch = new Latch(env);
940    }
941
942    public void add(T value) {
943      completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value));
944
945      abq.add(Optional.of(value));
946    }
947
948    public void complete() {
949      completedLatch.assertOpen("Unexpected additional complete signal received!");
950      completedLatch.close();
951
952      abq.add(Optional.<T>empty());
953    }
954
955    public T next(long timeoutMillis, String errorMsg) throws InterruptedException {
956      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
957
958      if (value == null) {
959        return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
960      } else if (value.isDefined()) {
961        return value.get();
962      } else {
963        return env.flopAndFail("Expected element but got end-of-stream");
964      }
965    }
966
967    public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
968      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
969
970      if (value == null) {
971        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
972        return Optional.empty();
973      }
974
975      return value;
976    }
977
978    /**
979     * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements
980     */
981    public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
982      List<T> result = new LinkedList<T>();
983      long remaining = elements;
984      long deadline = System.currentTimeMillis() + timeoutMillis;
985      while (remaining > 0) {
986        long remainingMillis = deadline - System.currentTimeMillis();
987
988        result.add(next(remainingMillis, errorMsg));
989        remaining--;
990      }
991
992      return result;
993    }
994
995
996    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
997      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
998
999      if (value == null) {
1000        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
1001      } else if (value.isDefined()) {
1002        env.flop(String.format("Expected end-of-stream but got element [%s]", value.get()));
1003      } // else, ok
1004    }
1005
1006    @SuppressWarnings("unchecked")
1007    public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
1008      Thread.sleep(timeoutMillis);
1009
1010      if (env.asyncErrors.isEmpty()) {
1011        return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
1012      } else {
1013        // ok, there was an expected error
1014        Throwable thrown = env.asyncErrors.remove(0);
1015
1016        if (clazz.isInstance(thrown)) {
1017          return (E) thrown;
1018        } else {
1019
1020          return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
1021                                               errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
1022        }
1023      }
1024    }
1025
1026    public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
1027      Thread.sleep(withinMillis);
1028      Optional<T> value = abq.poll();
1029
1030      if (value == null) {
1031        // ok
1032      } else if (value.isDefined()) {
1033        env.flop(String.format("%s [%s]", errorMsgPrefix, value.get()));
1034      } else {
1035        env.flop("Expected no element but got end-of-stream");
1036      }
1037    }
1038  }
1039}
1040