001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams;
013
014import java.util.concurrent.Flow;
015import static java.util.Objects.requireNonNull;
016
017/**
018 * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
019 */
020public final class FlowAdapters {
021    /** Utility class. */
022    private FlowAdapters() {
023        throw new IllegalStateException("No instances!");
024    }
025
026    /**
027     * Converts a Flow Publisher into a Reactive Streams Publisher.
028     * @param <T> the element type
029     * @param flowPublisher the source Flow Publisher to convert
030     * @return the equivalent Reactive Streams Publisher
031     */
032    @SuppressWarnings("unchecked")
033    public static <T> org.reactivestreams.Publisher<T> toPublisher(
034            Flow.Publisher<? extends T> flowPublisher) {
035        requireNonNull(flowPublisher, "flowPublisher");
036        final org.reactivestreams.Publisher<T> publisher;
037        if (flowPublisher instanceof FlowPublisherFromReactive) {
038            publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
039        } else if (flowPublisher instanceof org.reactivestreams.Publisher) {
040            publisher = (org.reactivestreams.Publisher<T>)flowPublisher;
041        } else {
042            publisher = new ReactivePublisherFromFlow<T>(flowPublisher);
043        }
044        return publisher;
045    }
046
047    /**
048     * Converts a Reactive Streams Publisher into a Flow Publisher.
049     * @param <T> the element type
050     * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert
051     * @return the equivalent Flow Publisher
052     */
053    @SuppressWarnings("unchecked")
054    public static <T> Flow.Publisher<T> toFlowPublisher(
055            org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
056    ) {
057        requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher");
058        final Flow.Publisher<T> flowPublisher;
059        if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
060            flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
061        } else if (reactiveStreamsPublisher instanceof Flow.Publisher) {
062            flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher;
063        } else {
064            flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
065        }
066        return flowPublisher;
067    }
068
069    /**
070     * Converts a Flow Processor into a Reactive Streams Processor.
071     * @param <T> the input value type
072     * @param <U> the output value type
073     * @param flowProcessor the source Flow Processor to convert
074     * @return the equivalent Reactive Streams Processor
075     */
076    @SuppressWarnings("unchecked")
077    public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
078            Flow.Processor<? super T, ? extends U> flowProcessor
079    ) {
080        requireNonNull(flowProcessor, "flowProcessor");
081        final org.reactivestreams.Processor<T, U> processor;
082        if (flowProcessor instanceof FlowToReactiveProcessor) {
083            processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
084        } else if (flowProcessor instanceof org.reactivestreams.Processor) {
085            processor = (org.reactivestreams.Processor<T, U>)flowProcessor;
086        } else {
087            processor = new ReactiveToFlowProcessor<T, U>(flowProcessor);
088        }
089        return processor;
090    }
091
092    /**
093     * Converts a Reactive Streams Processor into a Flow Processor.
094     * @param <T> the input value type
095     * @param <U> the output value type
096     * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert
097     * @return the equivalent Flow Processor
098     */
099    @SuppressWarnings("unchecked")
100    public static <T, U> Flow.Processor<T, U> toFlowProcessor(
101            org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
102        ) {
103        requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor");
104        final Flow.Processor<T, U> flowProcessor;
105        if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
106            flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
107        } else if (reactiveStreamsProcessor instanceof Flow.Processor) {
108            flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor;
109        } else {
110            flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
111        }
112        return flowProcessor;
113    }
114
115    /**
116     * Converts a Reactive Streams Subscriber into a Flow Subscriber.
117     * @param <T> the input and output value type
118     * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
119     * @return the equivalent Flow Subscriber
120     */
121    @SuppressWarnings("unchecked")
122    public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
123        requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber");
124        final Flow.Subscriber<T> flowSubscriber;
125        if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
126            flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
127        } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
128            flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber;
129        } else {
130            flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
131        }
132        return flowSubscriber;
133    }
134
135    /**
136     * Converts a Flow Subscriber into a Reactive Streams Subscriber.
137     * @param <T> the input and output value type
138     * @param flowSubscriber the Flow Subscriber instance to convert
139     * @return the equivalent Reactive Streams Subscriber
140     */
141    @SuppressWarnings("unchecked")
142    public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
143        requireNonNull(flowSubscriber, "flowSubscriber");
144        final org.reactivestreams.Subscriber<T> subscriber;
145        if (flowSubscriber instanceof FlowToReactiveSubscriber) {
146            subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
147        } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
148            subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber;
149        } else {
150            subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber);
151        }
152        return subscriber;
153    }
154
155    /**
156     * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
157     */
158    static final class FlowToReactiveSubscription implements Flow.Subscription {
159        final org.reactivestreams.Subscription reactiveStreams;
160
161        public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
162            this.reactiveStreams = reactive;
163        }
164
165        @Override
166        public void request(long n) {
167            reactiveStreams.request(n);
168        }
169
170        @Override
171        public void cancel() {
172            reactiveStreams.cancel();
173        }
174
175    }
176
177    /**
178     * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
179     */
180    static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
181        final Flow.Subscription flow;
182
183        public ReactiveToFlowSubscription(Flow.Subscription flow) {
184            this.flow = flow;
185        }
186
187        @Override
188        public void request(long n) {
189            flow.request(n);
190        }
191
192        @Override
193        public void cancel() {
194            flow.cancel();
195        }
196
197
198    }
199
200    /**
201     * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
202     * @param <T> the element type
203     */
204    static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> {
205        final org.reactivestreams.Subscriber<? super T> reactiveStreams;
206
207        public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
208            this.reactiveStreams = reactive;
209        }
210
211        @Override
212        public void onSubscribe(Flow.Subscription subscription) {
213            reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
214        }
215
216        @Override
217        public void onNext(T item) {
218            reactiveStreams.onNext(item);
219        }
220
221        @Override
222        public void onError(Throwable throwable) {
223            reactiveStreams.onError(throwable);
224        }
225
226        @Override
227        public void onComplete() {
228            reactiveStreams.onComplete();
229        }
230
231    }
232
233    /**
234     * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it.
235     * @param <T> the element type
236     */
237    static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> {
238        final Flow.Subscriber<? super T> flow;
239
240        public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
241            this.flow = flow;
242        }
243
244        @Override
245        public void onSubscribe(org.reactivestreams.Subscription subscription) {
246            flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
247        }
248
249        @Override
250        public void onNext(T item) {
251            flow.onNext(item);
252        }
253
254        @Override
255        public void onError(Throwable throwable) {
256            flow.onError(throwable);
257        }
258
259        @Override
260        public void onComplete() {
261            flow.onComplete();
262        }
263
264    }
265
266    /**
267     * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
268     * @param <T> the input type
269     * @param <U> the output type
270     */
271    static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> {
272        final Flow.Processor<? super T, ? extends U> flow;
273
274        public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
275            this.flow = flow;
276        }
277
278        @Override
279        public void onSubscribe(org.reactivestreams.Subscription subscription) {
280            flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
281        }
282
283        @Override
284        public void onNext(T t) {
285            flow.onNext(t);
286        }
287
288        @Override
289        public void onError(Throwable t) {
290            flow.onError(t);
291        }
292
293        @Override
294        public void onComplete() {
295            flow.onComplete();
296        }
297
298        @Override
299        public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
300            flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
301        }
302    }
303
304    /**
305     * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
306     * @param <T> the input type
307     * @param <U> the output type
308     */
309    static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> {
310        final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
311
312        public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
313            this.reactiveStreams = reactive;
314        }
315
316        @Override
317        public void onSubscribe(Flow.Subscription subscription) {
318            reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
319        }
320
321        @Override
322        public void onNext(T t) {
323            reactiveStreams.onNext(t);
324        }
325
326        @Override
327        public void onError(Throwable t) {
328            reactiveStreams.onError(t);
329        }
330
331        @Override
332        public void onComplete() {
333            reactiveStreams.onComplete();
334        }
335
336        @Override
337        public void subscribe(Flow.Subscriber<? super U> s) {
338            reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
339        }
340    }
341
342    /**
343     * Reactive Streams Publisher that wraps a Flow Publisher.
344     * @param <T> the element type
345     */
346    static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
347        final Flow.Publisher<? extends T> flow;
348
349        public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
350            this.flow = flowPublisher;
351        }
352
353        @Override
354        public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
355            flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
356        }
357    }
358
359    /**
360     * Flow Publisher that wraps a Reactive Streams Publisher.
361     * @param <T> the element type
362     */
363    static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
364
365        final org.reactivestreams.Publisher<? extends T> reactiveStreams;
366
367        public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
368            this.reactiveStreams = reactivePublisher;
369        }
370
371        @Override
372        public void subscribe(Flow.Subscriber<? super T> flow) {
373            reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
374        }
375    }
376
377}