001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck;
013
014import org.reactivestreams.Publisher;
015import org.reactivestreams.tck.flow.support.Function;
016import org.reactivestreams.tck.flow.support.HelperPublisher;
017import org.reactivestreams.tck.flow.support.InfiniteHelperPublisher;
018
019import java.util.concurrent.ExecutorService;
020
021/**
022 * Type which is able to create elements based on a seed {@code id} value.
023 * <p>
024 * Simplest implementations will simply return the incoming id as the element.
025 *
026 * @param <T> type of element to be delivered to the Subscriber
027 */
028public abstract class WithHelperPublisher<T> {
029
030  /** ExecutorService to be used by the provided helper {@link org.reactivestreams.Publisher} */
031  public abstract ExecutorService publisherExecutorService();
032
033  /**
034   * Implement this method to match your expected element type.
035   * In case of implementing a simple Subscriber which is able to consume any kind of element simply return the
036   * incoming {@code element} element.
037   * <p>
038   * Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement
039   * this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be
040   * to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier.
041   * <p>
042   * <b>Warning:</b> This method may be called concurrently by the helper publisher, thus it should be implemented in a
043   * thread-safe manner.
044   *
045   * @return element of the matching type {@code T} that will be delivered to the tested Subscriber
046   */
047  public abstract T createElement(int element);
048
049  /**
050   * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against.
051   * <p>
052   * By default an <b>asynchronously signalling Publisher</b> is provided, which will use {@link #createElement(int)}
053   * to generate elements type your Subscriber is able to consume.
054   * <p>
055   * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber
056   * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for
057   * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways:
058   * <ul>
059   *   <li>
060   *     If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite.
061   *   </li>
062   *   <li>
063   *     If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly.
064   *     In other words, it should represent a "completed stream".
065   *   </li>
066   * </ul>
067   */
068  @SuppressWarnings("unchecked")
069  public Publisher<T> createHelperPublisher(long elements) {
070    final Function<Integer, T> mkElement = new Function<Integer, T>() {
071      @Override public T apply(Integer id) throws Throwable {
072        return createElement(id);
073      }
074    };
075
076    if (elements > Integer.MAX_VALUE) return new InfiniteHelperPublisher(mkElement, publisherExecutorService());
077    else return new HelperPublisher(0, (int) elements, mkElement, publisherExecutorService());
078  }
079
080}