001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams.tck;
006
007import org.reactivestreams.Publisher;
008import org.reactivestreams.tck.flow.support.Function;
009import org.reactivestreams.tck.flow.support.HelperPublisher;
010import org.reactivestreams.tck.flow.support.InfiniteHelperPublisher;
011
012import java.util.concurrent.ExecutorService;
013
014/**
015 * Type which is able to create elements based on a seed {@code id} value.
016 * <p>
017 * Simplest implementations will simply return the incoming id as the element.
018 *
019 * @param <T> type of element to be delivered to the Subscriber
020 */
021public abstract class WithHelperPublisher<T> {
022
023  /** ExecutorService to be used by the provided helper {@link org.reactivestreams.Publisher} */
024  public abstract ExecutorService publisherExecutorService();
025
026  /**
027   * Implement this method to match your expected element type.
028   * In case of implementing a simple Subscriber which is able to consume any kind of element simply return the
029   * incoming {@code element} element.
030   * <p>
031   * Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement
032   * this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be
033   * to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier.
034   * <p>
035   * <b>Warning:</b> This method may be called concurrently by the helper publisher, thus it should be implemented in a
036   * thread-safe manner.
037   *
038   * @return element of the matching type {@code T} that will be delivered to the tested Subscriber
039   */
040  public abstract T createElement(int element);
041
042  /**
043   * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
044   * <p>
045   * By default an <b>asynchronously signalling Publisher</b> is provided, which will use {@link #createElement(int)}
046   * to generate elements type your Subscriber is able to consume.
047   * <p>
048   * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
049   * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for
050   * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways:
051   * <ul>
052   *   <li>
053   *     If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
054   *   </li>
055   *   <li>
056   *     If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
057   *     In other words, it should represent a "completed stream".
058   *   </li>
059   * </ul>
060   */
061  @SuppressWarnings("unchecked")
062  public Publisher<T> createHelperPublisher(long elements) {
063    final Function<Integer, T> mkElement = new Function<Integer, T>() {
064      @Override public T apply(Integer id) throws Throwable {
065        return createElement(id);
066      }
067    };
068
069    if (elements > Integer.MAX_VALUE) return new InfiniteHelperPublisher(mkElement, publisherExecutorService());
070    else return new HelperPublisher(0, (int) elements, mkElement, publisherExecutorService());
071  }
072
073}