001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.tck; 013 014import org.reactivestreams.Publisher; 015import org.reactivestreams.tck.flow.support.Function; 016import org.reactivestreams.tck.flow.support.HelperPublisher; 017import org.reactivestreams.tck.flow.support.InfiniteHelperPublisher; 018 019import java.util.concurrent.ExecutorService; 020 021/** 022 * Type which is able to create elements based on a seed {@code id} value. 023 * <p> 024 * Simplest implementations will simply return the incoming id as the element. 025 * 026 * @param <T> type of element to be delivered to the Subscriber 027 */ 028public abstract class WithHelperPublisher<T> { 029 030 /** ExecutorService to be used by the provided helper {@link org.reactivestreams.Publisher} */ 031 public abstract ExecutorService publisherExecutorService(); 032 033 /** 034 * Implement this method to match your expected element type. 035 * In case of implementing a simple Subscriber which is able to consume any kind of element simply return the 036 * incoming {@code element} element. 037 * <p> 038 * Sometimes the Subscriber may be limited in what type of element it is able to consume, this you may have to implement 039 * this method such that the emitted element matches the Subscribers requirements. Simplest implementations would be 040 * to simply pass in the {@code element} as payload of your custom element, such as appending it to a String or other identifier. 041 * <p> 042 * <b>Warning:</b> This method may be called concurrently by the helper publisher, thus it should be implemented in a 043 * thread-safe manner. 044 * 045 * @return element of the matching type {@code T} that will be delivered to the tested Subscriber 046 */ 047 public abstract T createElement(int element); 048 049 /** 050 * Helper method required for creating the Publisher to which the tested Subscriber will be subscribed and tested against. 051 * <p> 052 * By default an <b>asynchronously signalling Publisher</b> is provided, which will use {@link #createElement(int)} 053 * to generate elements type your Subscriber is able to consume. 054 * <p> 055 * Sometimes you may want to implement your own custom custom helper Publisher - to validate behaviour of a Subscriber 056 * when facing a synchronous Publisher for example. If you do, it MUST emit the exact number of elements asked for 057 * (via the {@code elements} parameter) and MUST also must treat the following numbers of elements in these specific ways: 058 * <ul> 059 * <li> 060 * If {@code elements} is {@code Long.MAX_VALUE} the produced stream must be infinite. 061 * </li> 062 * <li> 063 * If {@code elements} is {@code 0} the {@code Publisher} should signal {@code onComplete} immediatly. 064 * In other words, it should represent a "completed stream". 065 * </li> 066 * </ul> 067 */ 068 @SuppressWarnings("unchecked") 069 public Publisher<T> createHelperPublisher(long elements) { 070 final Function<Integer, T> mkElement = new Function<Integer, T>() { 071 @Override public T apply(Integer id) throws Throwable { 072 return createElement(id); 073 } 074 }; 075 076 if (elements > Integer.MAX_VALUE) return new InfiniteHelperPublisher(mkElement, publisherExecutorService()); 077 else return new HelperPublisher(0, (int) elements, mkElement, publisherExecutorService()); 078 } 079 080}