001package org.reactivestreams.tck; 002 003import org.reactivestreams.Publisher; 004import org.reactivestreams.tck.support.Function; 005import org.reactivestreams.tck.support.HelperPublisher; 006import org.reactivestreams.tck.support.InfiniteHelperPublisher; 007 008import java.util.concurrent.ExecutorService; 009 010/** 011 * Type which is able to create elements based on a seed {@code id} value. 012 * <p> 013 * Simplest implementations will simply return the incoming id as the element. 014 * 015 * @param <T> type of element to be delivered to the Subscriber 016 */ 017public abstract class WithHelperPublisher<T> { 018 019 /** ExecutorService to be used by the provided helper {@link org.reactivestreams.Publisher} */ 020 public abstract ExecutorService publisherExecutorService(); 021 022 /** 023 * Implement this method to match your expected element type. 024 * In case of implementing a simple Subscriber which is able to consume any kind of element simply return the 025 * incoming {@code element} element. 026 * <p> 027 * Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement 028 * this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be 029 * to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier. 030 * <p> 031 * <b>Warning:</b> This method may be called concurrently by the helper publisher, thus it should be implemented in a 032 * thread-safe manner. 033 * 034 * @return element of the matching type {@code T} that will be delivered to the tested Subscriber 035 */ 036 public abstract T createElement(int element); 037 038 /** 039 * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. 040 * <p> 041 * By default an <b>asynchronously signalling Publisher</b> is provided, which will use {@link #createElement(int)} 042 * to generate elements type your Subscriber is able to consume. 043 * <p> 044 * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber 045 * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for 046 * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways: 047 * <ul> 048 * <li> 049 * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. 050 * </li> 051 * <li> 052 * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. 053 * In other words, it should represent a "completed stream". 054 * </li> 055 * </ul> 056 */ 057 @SuppressWarnings("unchecked") 058 public Publisher<T> createHelperPublisher(long elements) { 059 final Function<Integer, T> mkElement = new Function<Integer, T>() { 060 @Override public T apply(Integer id) throws Throwable { 061 return createElement(id); 062 } 063 }; 064 065 if (elements > Integer.MAX_VALUE) return new InfiniteHelperPublisher(mkElement, publisherExecutorService()); 066 else return new HelperPublisher(0, (int) elements, mkElement, publisherExecutorService()); 067 } 068 069}