001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams;
006
007import java.util.concurrent.Flow;
008import static java.util.Objects.requireNonNull;
009
010/**
011 * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
012 */
013public final class FlowAdapters {
014    /** Utility class. */
015    private FlowAdapters() {
016        throw new IllegalStateException("No instances!");
017    }
018
019    /**
020     * Converts a Flow Publisher into a Reactive Streams Publisher.
021     * @param <T> the element type
022     * @param flowPublisher the source Flow Publisher to convert
023     * @return the equivalent Reactive Streams Publisher
024     */
025    @SuppressWarnings("unchecked")
026    public static <T> org.reactivestreams.Publisher<T> toPublisher(
027            Flow.Publisher<? extends T> flowPublisher) {
028        requireNonNull(flowPublisher, "flowPublisher");
029        final org.reactivestreams.Publisher<T> publisher;
030        if (flowPublisher instanceof FlowPublisherFromReactive) {
031            publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
032        } else if (flowPublisher instanceof org.reactivestreams.Publisher) {
033            publisher = (org.reactivestreams.Publisher<T>)flowPublisher;
034        } else {
035            publisher = new ReactivePublisherFromFlow<T>(flowPublisher);
036        }
037        return publisher;
038    }
039
040    /**
041     * Converts a Reactive Streams Publisher into a Flow Publisher.
042     * @param <T> the element type
043     * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert
044     * @return the equivalent Flow Publisher
045     */
046    @SuppressWarnings("unchecked")
047    public static <T> Flow.Publisher<T> toFlowPublisher(
048            org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
049    ) {
050        requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher");
051        final Flow.Publisher<T> flowPublisher;
052        if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
053            flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
054        } else if (reactiveStreamsPublisher instanceof Flow.Publisher) {
055            flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher;
056        } else {
057            flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
058        }
059        return flowPublisher;
060    }
061
062    /**
063     * Converts a Flow Processor into a Reactive Streams Processor.
064     * @param <T> the input value type
065     * @param <U> the output value type
066     * @param flowProcessor the source Flow Processor to convert
067     * @return the equivalent Reactive Streams Processor
068     */
069    @SuppressWarnings("unchecked")
070    public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
071            Flow.Processor<? super T, ? extends U> flowProcessor
072    ) {
073        requireNonNull(flowProcessor, "flowProcessor");
074        final org.reactivestreams.Processor<T, U> processor;
075        if (flowProcessor instanceof FlowToReactiveProcessor) {
076            processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
077        } else if (flowProcessor instanceof org.reactivestreams.Processor) {
078            processor = (org.reactivestreams.Processor<T, U>)flowProcessor;
079        } else {
080            processor = new ReactiveToFlowProcessor<T, U>(flowProcessor);
081        }
082        return processor;
083    }
084
085    /**
086     * Converts a Reactive Streams Processor into a Flow Processor.
087     * @param <T> the input value type
088     * @param <U> the output value type
089     * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert
090     * @return the equivalent Flow Processor
091     */
092    @SuppressWarnings("unchecked")
093    public static <T, U> Flow.Processor<T, U> toFlowProcessor(
094            org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
095        ) {
096        requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor");
097        final Flow.Processor<T, U> flowProcessor;
098        if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
099            flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
100        } else if (reactiveStreamsProcessor instanceof Flow.Processor) {
101            flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor;
102        } else {
103            flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
104        }
105        return flowProcessor;
106    }
107
108    /**
109     * Converts a Reactive Streams Subscriber into a Flow Subscriber.
110     * @param <T> the input and output value type
111     * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
112     * @return the equivalent Flow Subscriber
113     */
114    @SuppressWarnings("unchecked")
115    public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
116        requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber");
117        final Flow.Subscriber<T> flowSubscriber;
118        if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
119            flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
120        } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
121            flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber;
122        } else {
123            flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
124        }
125        return flowSubscriber;
126    }
127
128    /**
129     * Converts a Flow Subscriber into a Reactive Streams Subscriber.
130     * @param <T> the input and output value type
131     * @param flowSubscriber the Flow Subscriber instance to convert
132     * @return the equivalent Reactive Streams Subscriber
133     */
134    @SuppressWarnings("unchecked")
135    public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
136        requireNonNull(flowSubscriber, "flowSubscriber");
137        final org.reactivestreams.Subscriber<T> subscriber;
138        if (flowSubscriber instanceof FlowToReactiveSubscriber) {
139            subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
140        } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
141            subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber;
142        } else {
143            subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber);
144        }
145        return subscriber;
146    }
147
148    /**
149     * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
150     */
151    static final class FlowToReactiveSubscription implements Flow.Subscription {
152        final org.reactivestreams.Subscription reactiveStreams;
153
154        public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
155            this.reactiveStreams = reactive;
156        }
157
158        @Override
159        public void request(long n) {
160            reactiveStreams.request(n);
161        }
162
163        @Override
164        public void cancel() {
165            reactiveStreams.cancel();
166        }
167
168    }
169
170    /**
171     * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
172     */
173    static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
174        final Flow.Subscription flow;
175
176        public ReactiveToFlowSubscription(Flow.Subscription flow) {
177            this.flow = flow;
178        }
179
180        @Override
181        public void request(long n) {
182            flow.request(n);
183        }
184
185        @Override
186        public void cancel() {
187            flow.cancel();
188        }
189
190
191    }
192
193    /**
194     * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
195     * @param <T> the element type
196     */
197    static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> {
198        final org.reactivestreams.Subscriber<? super T> reactiveStreams;
199
200        public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
201            this.reactiveStreams = reactive;
202        }
203
204        @Override
205        public void onSubscribe(Flow.Subscription subscription) {
206            reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
207        }
208
209        @Override
210        public void onNext(T item) {
211            reactiveStreams.onNext(item);
212        }
213
214        @Override
215        public void onError(Throwable throwable) {
216            reactiveStreams.onError(throwable);
217        }
218
219        @Override
220        public void onComplete() {
221            reactiveStreams.onComplete();
222        }
223
224    }
225
226    /**
227     * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it.
228     * @param <T> the element type
229     */
230    static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> {
231        final Flow.Subscriber<? super T> flow;
232
233        public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
234            this.flow = flow;
235        }
236
237        @Override
238        public void onSubscribe(org.reactivestreams.Subscription subscription) {
239            flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
240        }
241
242        @Override
243        public void onNext(T item) {
244            flow.onNext(item);
245        }
246
247        @Override
248        public void onError(Throwable throwable) {
249            flow.onError(throwable);
250        }
251
252        @Override
253        public void onComplete() {
254            flow.onComplete();
255        }
256
257    }
258
259    /**
260     * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
261     * @param <T> the input type
262     * @param <U> the output type
263     */
264    static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> {
265        final Flow.Processor<? super T, ? extends U> flow;
266
267        public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
268            this.flow = flow;
269        }
270
271        @Override
272        public void onSubscribe(org.reactivestreams.Subscription subscription) {
273            flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
274        }
275
276        @Override
277        public void onNext(T t) {
278            flow.onNext(t);
279        }
280
281        @Override
282        public void onError(Throwable t) {
283            flow.onError(t);
284        }
285
286        @Override
287        public void onComplete() {
288            flow.onComplete();
289        }
290
291        @Override
292        public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
293            flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
294        }
295    }
296
297    /**
298     * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
299     * @param <T> the input type
300     * @param <U> the output type
301     */
302    static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> {
303        final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
304
305        public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
306            this.reactiveStreams = reactive;
307        }
308
309        @Override
310        public void onSubscribe(Flow.Subscription subscription) {
311            reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
312        }
313
314        @Override
315        public void onNext(T t) {
316            reactiveStreams.onNext(t);
317        }
318
319        @Override
320        public void onError(Throwable t) {
321            reactiveStreams.onError(t);
322        }
323
324        @Override
325        public void onComplete() {
326            reactiveStreams.onComplete();
327        }
328
329        @Override
330        public void subscribe(Flow.Subscriber<? super U> s) {
331            reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
332        }
333    }
334
335    /**
336     * Reactive Streams Publisher that wraps a Flow Publisher.
337     * @param <T> the element type
338     */
339    static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
340        final Flow.Publisher<? extends T> flow;
341
342        public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
343            this.flow = flowPublisher;
344        }
345
346        @Override
347        public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
348            flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
349        }
350    }
351
352    /**
353     * Flow Publisher that wraps a Reactive Streams Publisher.
354     * @param <T> the element type
355     */
356    static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
357
358        final org.reactivestreams.Publisher<? extends T> reactiveStreams;
359
360        public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
361            this.reactiveStreams = reactivePublisher;
362        }
363
364        @Override
365        public void subscribe(Flow.Subscriber<? super T> flow) {
366            reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
367        }
368    }
369
370}