001package org.reactivestreams.tck;
002
003import org.reactivestreams.Publisher;
004import org.reactivestreams.Subscriber;
005import org.reactivestreams.Subscription;
006import org.reactivestreams.tck.support.SubscriberBufferOverflowException;
007import org.reactivestreams.tck.support.Optional;
008
009import java.util.LinkedList;
010import java.util.List;
011import java.util.concurrent.ArrayBlockingQueue;
012import java.util.concurrent.CopyOnWriteArrayList;
013import java.util.concurrent.CountDownLatch;
014import java.util.concurrent.TimeUnit;
015
016import static org.testng.Assert.assertTrue;
017import static org.testng.Assert.fail;
018
019public class TestEnvironment {
020  public static final int TEST_BUFFER_SIZE = 16;
021
022  private static final String DEFAULT_TIMEOUT_MILLIS_ENV = "DEFAULT_TIMEOUT_MILLIS";
023  private static final long DEFAULT_TIMEOUT_MILLIS = 100;
024
025  private final long defaultTimeoutMillis;
026  private final boolean printlnDebug;
027
028  private CopyOnWriteArrayList<Throwable> asyncErrors = new CopyOnWriteArrayList<Throwable>();
029
030  /**
031   * Tests must specify the timeout for expected outcome of asynchronous
032   * interactions. Longer timeout does not invalidate the correctness of
033   * the implementation, but can in some cases result in longer time to
034   * run the tests.
035   *
036   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
037   * @param printlnDebug         if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
038   *                             often helpful to pinpoint simple race conditions etc.
039   */
040  public TestEnvironment(long defaultTimeoutMillis, boolean printlnDebug) {
041    this.defaultTimeoutMillis = defaultTimeoutMillis;
042    this.printlnDebug = printlnDebug;
043  }
044
045  /**
046   * Tests must specify the timeout for expected outcome of asynchronous
047   * interactions. Longer timeout does not invalidate the correctness of
048   * the implementation, but can in some cases result in longer time to
049   * run the tests.
050   *
051   * @param defaultTimeoutMillis default timeout to be used in all expect* methods
052   */
053  public TestEnvironment(long defaultTimeoutMillis) {
054    this(defaultTimeoutMillis, false);
055  }
056
057  /**
058   * Tests must specify the timeout for expected outcome of asynchronous
059   * interactions. Longer timeout does not invalidate the correctness of
060   * the implementation, but can in some cases result in longer time to
061   * run the tests.
062   *
063   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
064   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
065   *
066   * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
067   *                     often helpful to pinpoint simple race conditions etc.
068   */
069  public TestEnvironment(boolean printlnDebug) {
070    this(envDefaultTimeoutMillis(), printlnDebug);
071  }
072
073  /**
074   * Tests must specify the timeout for expected outcome of asynchronous
075   * interactions. Longer timeout does not invalidate the correctness of
076   * the implementation, but can in some cases result in longer time to
077   * run the tests.
078   *
079   * The default timeout for all expect* methods will be obtained by either the env variable {@code DEFAULT_TIMEOUT_MILLIS}
080   * or the default value ({@link TestEnvironment#DEFAULT_TIMEOUT_MILLIS}) will be used.
081   */
082  public TestEnvironment() {
083    this(envDefaultTimeoutMillis());
084  }
085
086  public long defaultTimeoutMillis() {
087    return defaultTimeoutMillis;
088  }
089
090  /**
091   * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
092   *
093   * @throws java.lang.IllegalArgumentException when unable to parse the env variable
094   */
095  public static long envDefaultTimeoutMillis() {
096    final String envMillis = System.getenv(DEFAULT_TIMEOUT_MILLIS_ENV);
097    if (envMillis == null) return DEFAULT_TIMEOUT_MILLIS;
098    else try {
099      return Long.parseLong(envMillis);
100    } catch(NumberFormatException ex) {
101      throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_TIMEOUT_MILLIS_ENV, envMillis), ex);
102    }
103  }
104
105  /**
106   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
107   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
108   *
109   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
110   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
111   * from the environment using {@code env.dropAsyncError()}.
112   *
113   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
114   */
115  public void flop(String msg) {
116    try {
117      fail(msg);
118    } catch (Throwable t) {
119      asyncErrors.add(t);
120    }
121  }
122
123  /**
124   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
125   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
126   *
127   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
128   *
129   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
130   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
131   * from the environment using {@code env.dropAsyncError()}.
132   *
133   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
134   */
135  public void flop(Throwable thr, String msg) {
136    try {
137      fail(msg, thr);
138    } catch (Throwable t) {
139      asyncErrors.add(thr);
140    }
141  }
142  
143  /**
144   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
145   * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
146   *
147   * This overload keeps the passed in throwable as the asyncError, instead of creating an AssertionError for this.
148   *
149   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
150   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
151   * from the environment using {@code env.dropAsyncError()}.
152   *
153   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
154   */
155  public void flop(Throwable thr) {
156    try {
157      fail(thr.getMessage(), thr);
158    } catch (Throwable t) {
159      asyncErrors.add(thr);
160    }
161  }
162
163  /**
164   * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
165   *
166   * This method DOES fail the test right away (it tries to, by throwing an AssertionException),
167   * in such it is different from {@link org.reactivestreams.tck.TestEnvironment#flop} which only records the error.
168   *
169   * Use {@code env.verifyNoAsyncErrorsNoDelay()} at the end of your TCK tests to verify there no flops called during it's execution.
170   * To check investigate asyncErrors more closely you can use {@code expectError} methods or collect the error directly
171   * from the environment using {@code env.dropAsyncError()}.
172   *
173   * To clear asyncErrors you can call {@link org.reactivestreams.tck.TestEnvironment#clearAsyncErrors()}
174   */
175  public <T> T flopAndFail(String msg) {
176    try {
177      fail(msg);
178    } catch (Throwable t) {
179      asyncErrors.add(t);
180      fail(msg, t);
181    }
182    return null; // unreachable, the previous block will always exit by throwing
183  }
184
185
186
187  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
188    subscribe(pub, sub, defaultTimeoutMillis);
189  }
190
191  public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
192    pub.subscribe(sub);
193    sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
194    verifyNoAsyncErrorsNoDelay();
195  }
196
197  public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
198    ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this);
199    subscribe(pub, sub, defaultTimeoutMillis());
200    return sub;
201  }
202
203  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
204    return newManualSubscriber(pub, defaultTimeoutMillis());
205  }
206
207  public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
208    ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this);
209    subscribe(pub, sub, timeoutMillis);
210    return sub;
211  }
212
213  public void clearAsyncErrors() {
214    asyncErrors.clear();
215  }
216
217  public Throwable dropAsyncError() {
218    try {
219      return asyncErrors.remove(0);
220    } catch (IndexOutOfBoundsException ex) {
221      return null;
222    }
223  }
224
225  /**
226   * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors
227   * were signalled pior to, or during that time (by calling {@code flop()}).
228   */
229  public void verifyNoAsyncErrors() {
230    verifyNoAsyncErrors(defaultTimeoutMillis());
231  }
232
233  /**
234   * This version of {@code verifyNoAsyncErrors} should be used when errors still could be signalled
235   * asynchronously during {@link TestEnvironment#defaultTimeoutMillis()} time.
236   * <p></p>
237   * It will immediatly check if any async errors were signaled (using {@link TestEnvironment#flop(String)},
238   * and if no errors encountered wait for another default timeout as the errors may yet be signalled.
239   * The initial check is performed in order to fail-fast in case of an already failed test.
240   */
241  public void verifyNoAsyncErrors(long delay) {
242    try {
243      verifyNoAsyncErrorsNoDelay();
244
245      Thread.sleep(delay);
246      verifyNoAsyncErrorsNoDelay();
247    } catch (InterruptedException e) {
248      throw new RuntimeException(e);
249    }
250  }
251
252  /**
253   * Verifies that no asynchronous errors were signalled pior to calling this method (by calling {@code flop()}).
254   * This version of verifyNoAsyncError <b>does not wait before checking for asynchronous errors</b>, and is to be used
255   * for example in tight loops etc.
256   */
257  public void verifyNoAsyncErrorsNoDelay() {
258    for (Throwable e : asyncErrors) {
259      if (e instanceof AssertionError) {
260        throw (AssertionError) e;
261      } else {
262        fail(String.format("Async error during test execution: %s", e.getMessage()), e);
263      }
264    }
265  }
266
267  /** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */
268  public void debug(String msg) {
269    if (printlnDebug)
270      System.out.printf("[TCK-DEBUG] %s%n", msg);
271  }
272
273  /**
274   * Looks for given {@code method} method in stack trace.
275   * Can be used to answer questions like "was this method called from onComplete?".
276   *
277   * @return the caller's StackTraceElement at which he the looked for method was found in the call stack, EMPTY otherwise
278   */
279  public Optional<StackTraceElement> findCallerMethodInStackTrace(String method) {
280    final Throwable thr = new Throwable(); // gets the stacktrace
281
282    for (StackTraceElement stackElement : thr.getStackTrace()) {
283      if (stackElement.getMethodName().equals(method)) {
284        return Optional.of(stackElement);
285      }
286    }
287    return Optional.empty();
288  }
289
290  // ---- classes ----
291
292  /**
293   * {@link Subscriber} implementation which can be steered by test code and asserted on.
294   */
295  public static class ManualSubscriber<T> extends TestSubscriber<T> {
296    Receptacle<T> received;
297
298    public ManualSubscriber(TestEnvironment env) {
299      super(env);
300      received = new Receptacle<T>(this.env);
301    }
302
303    @Override
304    public void onNext(T element) {
305      try {
306        received.add(element);
307      } catch (IllegalStateException ex) {
308          // error message refinement
309          throw new SubscriberBufferOverflowException(
310            String.format("Received more than bufferSize (%d) onNext signals. " +
311                            "The Publisher probably emited more signals than expected!",
312                          received.QUEUE_SIZE), ex);
313      }
314    }
315
316    @Override
317    public void onComplete() {
318      received.complete();
319    }
320
321    public void request(long elements) {
322      subscription.value().request(elements);
323    }
324
325    public T requestNextElement() throws InterruptedException {
326      return requestNextElement(env.defaultTimeoutMillis());
327    }
328
329    public T requestNextElement(long timeoutMillis) throws InterruptedException {
330      return requestNextElement(timeoutMillis, "Did not receive expected element");
331    }
332
333    public T requestNextElement(String errorMsg) throws InterruptedException {
334      return requestNextElement(env.defaultTimeoutMillis(), errorMsg);
335    }
336
337    public T requestNextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
338      request(1);
339      return nextElement(timeoutMillis, errorMsg);
340    }
341
342    public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException {
343      return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg);
344    }
345
346    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
347      return requestNextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
348    }
349
350    public Optional<T> requestNextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
351      request(1);
352      return nextElementOrEndOfStream(timeoutMillis, errorMsg);
353    }
354
355    public void requestEndOfStream() throws InterruptedException {
356      requestEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
357    }
358
359    public void requestEndOfStream(long timeoutMillis) throws InterruptedException {
360      requestEndOfStream(timeoutMillis, "Did not receive expected stream completion");
361    }
362
363    public void requestEndOfStream(String errorMsg) throws InterruptedException {
364      requestEndOfStream(env.defaultTimeoutMillis(), errorMsg);
365    }
366
367    public void requestEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
368      request(1);
369      expectCompletion(timeoutMillis, errorMsg);
370    }
371
372    public List<T> requestNextElements(long elements) throws InterruptedException {
373      request(elements);
374      return nextElements(elements, env.defaultTimeoutMillis());
375    }
376
377    public List<T> requestNextElements(long elements, long timeoutMillis) throws InterruptedException {
378      request(elements);
379      return nextElements(elements, timeoutMillis, String.format("Did not receive %d expected elements", elements));
380    }
381
382    public List<T> requestNextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
383      request(elements);
384      return nextElements(elements, timeoutMillis, errorMsg);
385    }
386
387    public T nextElement() throws InterruptedException {
388      return nextElement(env.defaultTimeoutMillis());
389    }
390
391    public T nextElement(long timeoutMillis) throws InterruptedException {
392      return nextElement(timeoutMillis, "Did not receive expected element");
393    }
394
395    public T nextElement(String errorMsg) throws InterruptedException {
396      return nextElement(env.defaultTimeoutMillis(), errorMsg);
397    }
398
399    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
400      return received.next(timeoutMillis, errorMsg);
401    }
402
403    public Optional<T> nextElementOrEndOfStream() throws InterruptedException {
404      return nextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
405    }
406
407    public Optional<T> nextElementOrEndOfStream(long timeoutMillis) throws InterruptedException {
408      return nextElementOrEndOfStream(timeoutMillis, "Did not receive expected stream completion");
409    }
410
411    public Optional<T> nextElementOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
412      return received.nextOrEndOfStream(timeoutMillis, errorMsg);
413    }
414
415    public List<T> nextElements(long elements) throws InterruptedException {
416      return nextElements(elements, env.defaultTimeoutMillis(), "Did not receive expected element or completion");
417    }
418
419    public List<T> nextElements(long elements, String errorMsg) throws InterruptedException {
420      return nextElements(elements, env.defaultTimeoutMillis(), errorMsg);
421    }
422
423    public List<T> nextElements(long elements, long timeoutMillis) throws InterruptedException {
424      return nextElements(elements, timeoutMillis, "Did not receive expected element or completion");
425    }
426
427    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
428      return received.nextN(elements, timeoutMillis, errorMsg);
429    }
430
431    public void expectNext(T expected) throws InterruptedException {
432      expectNext(expected, env.defaultTimeoutMillis());
433    }
434
435    public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
436      T received = nextElement(timeoutMillis, "Did not receive expected element on downstream");
437      if (!received.equals(expected)) {
438        env.flop(String.format("Expected element %s on downstream but received %s", expected, received));
439      }
440    }
441
442    public void expectCompletion() throws InterruptedException {
443      expectCompletion(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
444    }
445
446    public void expectCompletion(long timeoutMillis) throws InterruptedException {
447      expectCompletion(timeoutMillis, "Did not receive expected stream completion");
448    }
449
450    public void expectCompletion(String errorMsg) throws InterruptedException {
451      expectCompletion(env.defaultTimeoutMillis(), errorMsg);
452    }
453
454    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
455      received.expectCompletion(timeoutMillis, errorMsg);
456    }
457
458    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
459      expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis());
460    }
461
462    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
463    public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart, long timeoutMillis) throws Exception {
464      final E err = expectError(expected, timeoutMillis);
465      final String message = err.getMessage();
466      assertTrue(message.contains(requiredMessagePart),
467                 String.format("Got expected exception [%s] but missing message part [%s], was: %s",
468                               err.getClass(), requiredMessagePart, err.getMessage()));
469    }
470
471    public <E extends Throwable> E expectError(Class<E> expected) throws Exception {
472      return expectError(expected, env.defaultTimeoutMillis());
473    }
474
475    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis) throws Exception {
476      return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName()));
477    }
478
479    public <E extends Throwable> E expectError(Class<E> expected, String errorMsg) throws Exception {
480      return expectError(expected, env.defaultTimeoutMillis(), errorMsg);
481    }
482
483    public <E extends Throwable> E expectError(Class<E> expected, long timeoutMillis, String errorMsg) throws Exception {
484      return received.expectError(expected, timeoutMillis, errorMsg);
485    }
486
487    public void expectNone() throws InterruptedException {
488      expectNone(env.defaultTimeoutMillis());
489    }
490
491    public void expectNone(String errMsgPrefix) throws InterruptedException {
492      expectNone(env.defaultTimeoutMillis(), errMsgPrefix);
493    }
494
495    public void expectNone(long withinMillis) throws InterruptedException {
496      expectNone(withinMillis, "Did not expect an element but got element");
497    }
498
499    public void expectNone(long withinMillis, String errMsgPrefix) throws InterruptedException {
500      received.expectNone(withinMillis, errMsgPrefix);
501    }
502
503  }
504
505  public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {
506
507    public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
508      super(env);
509    }
510
511    @Override
512    public void onNext(T element) {
513      env.debug(String.format("%s::onNext(%s)", this, element));
514      if (subscription.isCompleted()) {
515        super.onNext(element);
516      } else {
517        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
518      }
519    }
520
521    @Override
522    public void onComplete() {
523      env.debug(this + "::onComplete()");
524      if (subscription.isCompleted()) {
525        super.onComplete();
526      } else {
527        env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
528      }
529    }
530
531    @Override
532    public void onSubscribe(Subscription s) {
533      env.debug(String.format("%s::onSubscribe(%s)", this, s));
534      if (!subscription.isCompleted()) {
535        subscription.complete(s);
536      } else {
537        env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
538      }
539    }
540
541    @Override
542    public void onError(Throwable cause) {
543      env.debug(String.format("%s::onError(%s)", this, cause));
544      if (subscription.isCompleted()) {
545        super.onError(cause);
546      } else {
547        env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));
548      }
549    }
550  }
551
552  /**
553   * Similar to {@link org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport}
554   * but does not accumulate values signalled via <code>onNext</code>, thus it can not be used to assert
555   * values signalled to this subscriber. Instead it may be used to quickly drain a given publisher.
556   */
557  public static class BlackholeSubscriberWithSubscriptionSupport<T>
558    extends ManualSubscriberWithSubscriptionSupport<T> {
559
560    public BlackholeSubscriberWithSubscriptionSupport(TestEnvironment env) {
561      super(env);
562    }
563
564    @Override
565    public void onNext(T element) {
566      env.debug(String.format("%s::onNext(%s)", this, element));
567      if (!subscription.isCompleted()) {
568        env.flop(String.format("Subscriber::onNext(%s) called before Subscriber::onSubscribe", element));
569      }
570    }
571
572    @Override
573    public T nextElement(long timeoutMillis, String errorMsg) throws InterruptedException {
574      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
575    }
576
577    @Override
578    public List<T> nextElements(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
579      throw new RuntimeException("Can not expect elements from BlackholeSubscriber, use ManualSubscriber instead!");
580    }
581  }
582
583  public static class TestSubscriber<T> implements Subscriber<T> {
584    final Promise<Subscription> subscription;
585
586    protected final TestEnvironment env;
587
588    public TestSubscriber(TestEnvironment env) {
589      this.env = env;
590      subscription = new Promise<Subscription>(env);
591    }
592
593    @Override
594    public void onError(Throwable cause) {
595      env.flop(cause, String.format("Unexpected Subscriber::onError(%s)", cause));
596    }
597
598    @Override
599    public void onComplete() {
600      env.flop("Unexpected Subscriber::onComplete()");
601    }
602
603    @Override
604    public void onNext(T element) {
605      env.flop(String.format("Unexpected Subscriber::onNext(%s)", element));
606    }
607
608    @Override
609    public void onSubscribe(Subscription subscription) {
610      env.flop(String.format("Unexpected Subscriber::onSubscribe(%s)", subscription));
611    }
612
613    public void cancel() {
614      if (subscription.isCompleted()) {
615        subscription.value().cancel();
616      } else {
617        env.flop("Cannot cancel a subscription before having received it");
618      }
619    }
620  }
621
622  public static class ManualPublisher<T> implements Publisher<T> {
623    protected final TestEnvironment env;
624
625    protected long pendingDemand = 0L;
626    protected Promise<Subscriber<? super T>> subscriber;
627
628    protected final Receptacle<Long> requests;
629
630    protected final Latch cancelled;
631
632    public ManualPublisher(TestEnvironment env) {
633      this.env = env;
634      requests = new Receptacle<Long>(env);
635      cancelled = new Latch(env);
636      subscriber = new Promise<Subscriber<? super T>>(this.env);
637    }
638
639    @Override
640    public void subscribe(Subscriber<? super T> s) {
641      if (!subscriber.isCompleted()) {
642        subscriber.completeImmediatly(s);
643
644        Subscription subs = new Subscription() {
645          @Override
646          public void request(long elements) {
647            requests.add(elements);
648          }
649
650          @Override
651          public void cancel() {
652            cancelled.close();
653          }
654        };
655        s.onSubscribe(subs);
656
657      } else {
658        env.flop("TestPublisher doesn't support more than one Subscriber");
659      }
660    }
661
662    public void sendNext(T element) {
663      if (subscriber.isCompleted()) {
664        subscriber.value().onNext(element);
665      } else {
666        env.flop("Cannot sendNext before having a Subscriber");
667      }
668    }
669
670    public void sendCompletion() {
671      if (subscriber.isCompleted()) {
672        subscriber.value().onComplete();
673      } else {
674        env.flop("Cannot sendCompletion before having a Subscriber");
675      }
676    }
677
678    public void sendError(Throwable cause) {
679      if (subscriber.isCompleted()) {
680        subscriber.value().onError(cause);
681      } else {
682        env.flop("Cannot sendError before having a Subscriber");
683      }
684    }
685
686    public long expectRequest() throws InterruptedException {
687      return expectRequest(env.defaultTimeoutMillis());
688    }
689
690    public long expectRequest(long timeoutMillis) throws InterruptedException {
691      long requested = requests.next(timeoutMillis, "Did not receive expected `request` call");
692      if (requested <= 0) {
693        return env.<Long>flopAndFail(String.format("Requests cannot be zero or negative but received request(%s)", requested));
694      } else {
695        pendingDemand += requested;
696        return requested;
697      }
698    }
699
700    public void expectExactRequest(long expected) throws InterruptedException {
701      expectExactRequest(expected, env.defaultTimeoutMillis());
702    }
703
704    public void expectExactRequest(long expected, long timeoutMillis) throws InterruptedException {
705      long requested = expectRequest(timeoutMillis);
706      if (requested != expected) {
707        env.flop(String.format("Received `request(%d)` on upstream but expected `request(%d)`", requested, expected));
708      }
709      pendingDemand += requested;
710    }
711
712    public void expectNoRequest() throws InterruptedException {
713      expectNoRequest(env.defaultTimeoutMillis());
714    }
715
716    public void expectNoRequest(long timeoutMillis) throws InterruptedException {
717      requests.expectNone(timeoutMillis, "Received an unexpected call to: request: ");
718    }
719
720    public void expectCancelling() throws InterruptedException {
721      expectCancelling(env.defaultTimeoutMillis());
722    }
723
724    public void expectCancelling(long timeoutMillis) throws InterruptedException {
725      cancelled.expectClose(timeoutMillis, "Did not receive expected cancelling of upstream subscription");
726    }
727  }
728
729  /**
730   * Like a CountDownLatch, but resettable and with some convenience methods
731   */
732  public static class Latch {
733    private final TestEnvironment env;
734    volatile private CountDownLatch countDownLatch = new CountDownLatch(1);
735
736    public Latch(TestEnvironment env) {
737      this.env = env;
738    }
739
740    public void reOpen() {
741      countDownLatch = new CountDownLatch(1);
742    }
743
744    public boolean isClosed() {
745      return countDownLatch.getCount() == 0;
746    }
747
748    public void close() {
749      countDownLatch.countDown();
750    }
751
752    public void assertClosed(String openErrorMsg) {
753      if (!isClosed()) {
754        env.flop(new ExpectedClosedLatchException(openErrorMsg));
755      }
756    }
757
758    public void assertOpen(String closedErrorMsg) {
759      if (isClosed()) {
760        env.flop(new ExpectedOpenLatchException(closedErrorMsg));
761      }
762    }
763
764    public void expectClose(String notClosedErrorMsg) throws InterruptedException {
765      expectClose(env.defaultTimeoutMillis(), notClosedErrorMsg);
766    }
767
768    public void expectClose(long timeoutMillis, String notClosedErrorMsg) throws InterruptedException {
769      countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
770      if (countDownLatch.getCount() > 0) {
771        env.flop(String.format("%s within %d ms", notClosedErrorMsg, timeoutMillis));
772      }
773    }
774
775    static final class ExpectedOpenLatchException extends RuntimeException {
776      public ExpectedOpenLatchException(String message) {
777        super(message);
778      }
779    }
780
781    static final class ExpectedClosedLatchException extends RuntimeException {
782      public ExpectedClosedLatchException(String message) {
783        super(message);
784      }
785    }
786
787  }
788
789  // simple promise for *one* value, which cannot be reset
790  public static class Promise<T> {
791    private final TestEnvironment env;
792
793    public static <T> Promise<T> completed(TestEnvironment env, T value) {
794      Promise<T> promise = new Promise<T>(env);
795      promise.completeImmediatly(value);
796      return promise;
797    }
798
799    public Promise(TestEnvironment env) {
800      this.env = env;
801    }
802
803    private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
804    private volatile T _value = null;
805
806    public T value() {
807      if (isCompleted()) {
808        return _value;
809      } else {
810        env.flop("Cannot access promise value before completion");
811        return null;
812      }
813    }
814
815    public boolean isCompleted() {
816      return _value != null;
817    }
818
819    /**
820     * Allows using expectCompletion to await for completion of the value and complete it _then_
821     */
822    public void complete(T value) {
823      abq.add(value);
824    }
825
826    /**
827     * Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way
828     */
829    public void completeImmediatly(T value) {
830      complete(value); // complete!
831      _value = value;  // immediatly!
832    }
833
834    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
835      if (!isCompleted()) {
836        T val = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
837
838        if (val == null) {
839          env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
840        } else {
841          _value = val;
842        }
843      }
844    }
845  }
846
847  // a "Promise" for multiple values, which also supports "end-of-stream reached"
848  public static class Receptacle<T> {
849    final int QUEUE_SIZE = 2 * TEST_BUFFER_SIZE;
850    private final TestEnvironment env;
851
852    private final ArrayBlockingQueue<Optional<T>> abq = new ArrayBlockingQueue<Optional<T>>(QUEUE_SIZE);
853
854    private final Latch completedLatch;
855
856    Receptacle(TestEnvironment env) {
857      this.env = env;
858      this.completedLatch = new Latch(env);
859    }
860
861    public void add(T value) {
862      completedLatch.assertOpen(String.format("Unexpected element %s received after stream completed", value));
863
864      abq.add(Optional.of(value));
865    }
866
867    public void complete() {
868      completedLatch.assertOpen("Unexpected additional complete signal received!");
869      completedLatch.close();
870
871      abq.add(Optional.<T>empty());
872    }
873
874    public T next(long timeoutMillis, String errorMsg) throws InterruptedException {
875      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
876
877      if (value == null) {
878        return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
879      } else if (value.isDefined()) {
880        return value.get();
881      } else {
882        return env.flopAndFail("Expected element but got end-of-stream");
883      }
884    }
885
886    public Optional<T> nextOrEndOfStream(long timeoutMillis, String errorMsg) throws InterruptedException {
887      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
888
889      if (value == null) {
890        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
891        return Optional.empty();
892      }
893
894      return value;
895    }
896
897    /**
898     * @param timeoutMillis total timeout time for awaiting all {@code elements} number of elements
899     */
900    public List<T> nextN(long elements, long timeoutMillis, String errorMsg) throws InterruptedException {
901      List<T> result = new LinkedList<T>();
902      long remaining = elements;
903      long deadline = System.currentTimeMillis() + timeoutMillis;
904      while (remaining > 0) {
905        long remainingMillis = deadline - System.currentTimeMillis();
906
907        result.add(next(remainingMillis, errorMsg));
908        remaining--;
909      }
910
911      return result;
912    }
913
914
915    public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
916      Optional<T> value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS);
917
918      if (value == null) {
919        env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
920      } else if (value.isDefined()) {
921        env.flop(String.format("Expected end-of-stream but got element [%s]", value.get()));
922      } // else, ok
923    }
924
925    @SuppressWarnings("unchecked")
926    public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
927      Thread.sleep(timeoutMillis);
928
929      if (env.asyncErrors.isEmpty()) {
930        return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis));
931      } else {
932        // ok, there was an expected error
933        Throwable thrown = env.asyncErrors.remove(0);
934
935        if (clazz.isInstance(thrown)) {
936          return (E) thrown;
937        } else {
938
939          return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s",
940                                               errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName()));
941        }
942      }
943    }
944
945    public void expectNone(long withinMillis, String errorMsgPrefix) throws InterruptedException {
946      Thread.sleep(withinMillis);
947      Optional<T> value = abq.poll();
948
949      if (value == null) {
950        // ok
951      } else if (value.isDefined()) {
952        env.flop(String.format("%s [%s]", errorMsgPrefix, value.get()));
953      } else {
954        env.flop("Expected no element but got end-of-stream");
955      }
956    }
957  }
958}
959