001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.tck.flow;
013
014import org.reactivestreams.*;
015import org.reactivestreams.tck.IdentityProcessorVerification;
016import org.reactivestreams.tck.TestEnvironment;
017import org.reactivestreams.tck.flow.support.SubscriberWhiteboxVerificationRules;
018import org.reactivestreams.tck.flow.support.PublisherVerificationRules;
019
020import java.util.concurrent.Flow;
021
022public abstract class IdentityFlowProcessorVerification<T> extends IdentityProcessorVerification<T>
023  implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
024
025  public IdentityFlowProcessorVerification(TestEnvironment env) {
026    super(env);
027  }
028
029  public IdentityFlowProcessorVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis) {
030    super(env, publisherReferenceGCTimeoutMillis);
031  }
032
033  public IdentityFlowProcessorVerification(TestEnvironment env, long publisherReferenceGCTimeoutMillis, int processorBufferSize) {
034    super(env, publisherReferenceGCTimeoutMillis, processorBufferSize);
035  }
036
037  /**
038   * By implementing this method, additional TCK tests concerning a "failed" Flow publishers will be run.
039   *
040   * The expected behaviour of the {@link Flow.Publisher} returned by this method is hand out a subscription,
041   * followed by signalling {@code onError} on it, as specified by Rule 1.9.
042   *
043   * If you want to ignore these additional tests, return {@code null} from this method.
044   */
045  protected abstract Flow.Publisher<T> createFailedFlowPublisher();
046
047  /**
048   * This is the main method you must implement in your test incarnation.
049   * It must create a {@link Flow.Processor}, which simply forwards all stream elements from its upstream
050   * to its downstream. It must be able to internally buffer the given number of elements.
051   *
052   * @param bufferSize number of elements the processor is required to be able to buffer.
053   */
054  protected abstract Flow.Processor<T,T> createIdentityFlowProcessor(int bufferSize);
055
056  @Override
057  public final Processor<T, T> createIdentityProcessor(int bufferSize) {
058    return FlowAdapters.toProcessor(createIdentityFlowProcessor(bufferSize));
059  }
060
061  @Override
062  public final Publisher<T> createFailedPublisher() {
063    return FlowAdapters.toPublisher(createFailedFlowPublisher());
064  }
065
066}