001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams;
013
014import java.util.concurrent.Flow;
015
016/**
017 * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
018 */
019public final class FlowAdapters {
020    /** Utility class. */
021    private FlowAdapters() {
022        throw new IllegalStateException("No instances!");
023    }
024
025    /**
026     * Converts a Flow Publisher into a Reactive Streams Publisher.
027     * @param <T> the element type
028     * @param flowPublisher the source Flow Publisher to convert
029     * @return the equivalent Reactive Streams Publisher
030     */
031    @SuppressWarnings("unchecked")
032    public static <T> org.reactivestreams.Publisher<T> toPublisher(
033            Flow.Publisher<? extends T> flowPublisher) {
034        if (flowPublisher == null) {
035            throw new NullPointerException("flowPublisher");
036        }
037        if (flowPublisher instanceof FlowPublisherFromReactive) {
038            return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
039        }
040        if (flowPublisher instanceof org.reactivestreams.Publisher) {
041            return (org.reactivestreams.Publisher<T>)flowPublisher;
042        }
043        return new ReactivePublisherFromFlow<T>(flowPublisher);
044    }
045
046    /**
047     * Converts a Reactive Streams Publisher into a Flow Publisher.
048     * @param <T> the element type
049     * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert
050     * @return the equivalent Flow Publisher
051     */
052    @SuppressWarnings("unchecked")
053    public static <T> Flow.Publisher<T> toFlowPublisher(
054            org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
055    ) {
056        if (reactiveStreamsPublisher == null) {
057            throw new NullPointerException("reactiveStreamsPublisher");
058        }
059        if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
060            return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
061        }
062        if (reactiveStreamsPublisher instanceof Flow.Publisher) {
063            return (Flow.Publisher<T>)reactiveStreamsPublisher;
064        }
065        return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
066    }
067
068    /**
069     * Converts a Flow Processor into a Reactive Streams Processor.
070     * @param <T> the input value type
071     * @param <U> the output value type
072     * @param flowProcessor the source Flow Processor to convert
073     * @return the equivalent Reactive Streams Processor
074     */
075    @SuppressWarnings("unchecked")
076    public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
077            Flow.Processor<? super T, ? extends U> flowProcessor
078    ) {
079        if (flowProcessor == null) {
080            throw new NullPointerException("flowProcessor");
081        }
082        if (flowProcessor instanceof FlowToReactiveProcessor) {
083            return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
084        }
085        if (flowProcessor instanceof org.reactivestreams.Processor) {
086            return (org.reactivestreams.Processor<T, U>)flowProcessor;
087        }
088        return new ReactiveToFlowProcessor<T, U>(flowProcessor);
089    }
090
091    /**
092     * Converts a Reactive Streams Processor into a Flow Processor.
093     * @param <T> the input value type
094     * @param <U> the output value type
095     * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert
096     * @return the equivalent Flow Processor
097     */
098    @SuppressWarnings("unchecked")
099    public static <T, U> Flow.Processor<T, U> toFlowProcessor(
100            org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
101        ) {
102        if (reactiveStreamsProcessor == null) {
103            throw new NullPointerException("reactiveStreamsProcessor");
104        }
105        if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
106            return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
107        }
108        if (reactiveStreamsProcessor instanceof Flow.Processor) {
109            return (Flow.Processor<T, U>)reactiveStreamsProcessor;
110        }
111        return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
112    }
113
114    /**
115     * Converts a Reactive Streams Subscriber into a Flow Subscriber.
116     * @param <T> the input and output value type
117     * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
118     * @return the equivalent Flow Subscriber
119     */
120    @SuppressWarnings("unchecked")
121    public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
122        if (reactiveStreamsSubscriber == null) {
123            throw new NullPointerException("reactiveStreamsSubscriber");
124        }
125        if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
126            return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
127        }
128        if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
129            return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
130        }
131        return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
132    }
133
134    /**
135     * Converts a Flow Subscriber into a Reactive Streams Subscriber.
136     * @param <T> the input and output value type
137     * @param flowSubscriber the Flow Subscriber instance to convert
138     * @return the equivalent Reactive Streams Subscriber
139     */
140    @SuppressWarnings("unchecked")
141    public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
142        if (flowSubscriber == null) {
143            throw new NullPointerException("flowSubscriber");
144        }
145        if (flowSubscriber instanceof FlowToReactiveSubscriber) {
146            return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
147        }
148        if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
149            return (org.reactivestreams.Subscriber<T>)flowSubscriber;
150        }
151        return new ReactiveToFlowSubscriber<T>(flowSubscriber);
152    }
153
154    /**
155     * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
156     */
157    static final class FlowToReactiveSubscription implements Flow.Subscription {
158        final org.reactivestreams.Subscription reactiveStreams;
159
160        public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
161            this.reactiveStreams = reactive;
162        }
163
164        @Override
165        public void request(long n) {
166            reactiveStreams.request(n);
167        }
168
169        @Override
170        public void cancel() {
171            reactiveStreams.cancel();
172        }
173
174    }
175
176    /**
177     * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
178     */
179    static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
180        final Flow.Subscription flow;
181
182        public ReactiveToFlowSubscription(Flow.Subscription flow) {
183            this.flow = flow;
184        }
185
186        @Override
187        public void request(long n) {
188            flow.request(n);
189        }
190
191        @Override
192        public void cancel() {
193            flow.cancel();
194        }
195
196
197    }
198
199    /**
200     * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
201     * @param <T> the element type
202     */
203    static final class FlowToReactiveSubscriber<T>
204            implements Flow.Subscriber<T> {
205        final org.reactivestreams.Subscriber<? super T> reactiveStreams;
206
207        public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
208            this.reactiveStreams = reactive;
209        }
210
211        @Override
212        public void onSubscribe(Flow.Subscription subscription) {
213            reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
214        }
215
216        @Override
217        public void onNext(T item) {
218            reactiveStreams.onNext(item);
219        }
220
221        @Override
222        public void onError(Throwable throwable) {
223            reactiveStreams.onError(throwable);
224        }
225
226        @Override
227        public void onComplete() {
228            reactiveStreams.onComplete();
229        }
230
231    }
232
233    /**
234     * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
235     * @param <T> the element type
236     */
237    static final class ReactiveToFlowSubscriber<T>
238            implements org.reactivestreams.Subscriber<T> {
239        final Flow.Subscriber<? super T> flow;
240
241        public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
242            this.flow = flow;
243        }
244
245        @Override
246        public void onSubscribe(org.reactivestreams.Subscription subscription) {
247            flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
248        }
249
250        @Override
251        public void onNext(T item) {
252            flow.onNext(item);
253        }
254
255        @Override
256        public void onError(Throwable throwable) {
257            flow.onError(throwable);
258        }
259
260        @Override
261        public void onComplete() {
262            flow.onComplete();
263        }
264
265    }
266
267    /**
268     * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
269     * @param <T> the input type
270     * @param <U> the output type
271     */
272    static final class ReactiveToFlowProcessor<T, U>
273            implements org.reactivestreams.Processor<T, U> {
274        final Flow.Processor<? super T, ? extends U> flow;
275
276        public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
277            this.flow = flow;
278        }
279
280        @Override
281        public void onSubscribe(org.reactivestreams.Subscription subscription) {
282            flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
283        }
284
285        @Override
286        public void onNext(T t) {
287            flow.onNext(t);
288        }
289
290        @Override
291        public void onError(Throwable t) {
292            flow.onError(t);
293        }
294
295        @Override
296        public void onComplete() {
297            flow.onComplete();
298        }
299
300        @Override
301        public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
302            flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
303        }
304    }
305
306    /**
307     * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
308     * @param <T> the input type
309     * @param <U> the output type
310     */
311    static final class FlowToReactiveProcessor<T, U>
312            implements Flow.Processor<T, U> {
313        final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
314
315        public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
316            this.reactiveStreams = reactive;
317        }
318
319        @Override
320        public void onSubscribe(Flow.Subscription subscription) {
321            reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
322        }
323
324        @Override
325        public void onNext(T t) {
326            reactiveStreams.onNext(t);
327        }
328
329        @Override
330        public void onError(Throwable t) {
331            reactiveStreams.onError(t);
332        }
333
334        @Override
335        public void onComplete() {
336            reactiveStreams.onComplete();
337        }
338
339        @Override
340        public void subscribe(Flow.Subscriber<? super U> s) {
341            reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
342        }
343    }
344
345    /**
346     * Reactive Streams Publisher that wraps a Flow Publisher.
347     * @param <T> the element type
348     */
349    static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
350
351        final Flow.Publisher<? extends T> flow;
352
353        public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
354            this.flow = flowPublisher;
355        }
356
357        @Override
358        public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
359            flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
360        }
361    }
362
363    /**
364     * Flow Publisher that wraps a Reactive Streams Publisher.
365     * @param <T> the element type
366     */
367    static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
368
369        final org.reactivestreams.Publisher<? extends T> reactiveStreams;
370
371        public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
372            this.reactiveStreams = reactivePublisher;
373        }
374
375        @Override
376        public void subscribe(Flow.Subscriber<? super T> flow) {
377            reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
378        }
379    }
380
381}