001/************************************************************************
002 * Licensed under Public Domain (CC0)                                    *
003 *                                                                       *
004 * To the extent possible under law, the person who associated CC0 with  *
005 * this code has waived all copyright and related or neighboring         *
006 * rights to this code.                                                  *
007 *                                                                       *
008 * You should have received a copy of the CC0 legalcode along with this  *
009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
010 ************************************************************************/
011
012package org.reactivestreams.example.unicast;
013
014import org.reactivestreams.*;
015
016import java.util.concurrent.atomic.AtomicLong;
017
018/**
019 * A synchronous implementation of the {@link Publisher} that can
020 * be subscribed to multiple times and each individual subscription
021 * will receive range of monotonically increasing integer values on demand.
022 */
023public final class RangePublisher implements Publisher<Integer> {
024
025    /** The starting value of the range. */
026    final int start;
027
028    /** The number of items to emit. */
029    final int count;
030
031    /**
032     * Constructs a RangePublisher instance with the given start and count values
033     * that yields a sequence of [start, start + count).
034     * @param start the starting value of the range
035     * @param count the number of items to emit
036     */
037    public RangePublisher(int start, int count) {
038        this.start = start;
039        this.count = count;
040    }
041
042    @Override
043    public void subscribe(Subscriber<? super Integer> subscriber) {
044        // As per rule 1.11, we have decided to support multiple subscribers
045        // in a unicast configuration for this `Publisher` implementation.
046
047        // As per rule 1.09, we need to throw a `java.lang.NullPointerException`
048        // if the `Subscriber` is `null`
049        if (subscriber == null) throw null;
050
051        // As per 2.13, this method must return normally (i.e. not throw).
052        try {
053            subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count));
054        } catch (Throwable ex) {
055            new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " +
056                    "by throwing an exception from onSubscribe.", ex)
057                    // When onSubscribe fails this way, we don't know what state the
058                    // subscriber is thus calling onError may cause more crashes.
059                    .printStackTrace();
060        }
061    }
062
063    /**
064     * A Subscription implementation that holds the current downstream
065     * requested amount and responds to the downstream's request() and
066     * cancel() calls.
067     */
068    static final class RangeSubscription
069            // We are using this `AtomicLong` to make sure that this `Subscription`
070            // doesn't run concurrently with itself, which would violate rule 1.3
071            // among others (no concurrent notifications).
072            // The atomic transition from 0L to N > 0L will ensure this.
073            extends AtomicLong implements Subscription {
074
075        private static final long serialVersionUID = -9000845542177067735L;
076
077        /** The Subscriber we are emitting integer values to. */
078        final Subscriber<? super Integer> downstream;
079
080        /** The end index (exclusive). */
081        final int end;
082
083        /**
084         * The current index and within the [start, start + count) range that
085         * will be emitted as downstream.onNext().
086         */
087        int index;
088
089        /**
090         * Indicates the emission should stop.
091         */
092        volatile boolean cancelled;
093
094        /**
095         * Holds onto the IllegalArgumentException (containing the offending stacktrace)
096         * indicating there was a non-positive request() call from the downstream.
097         */
098        volatile Throwable invalidRequest;
099
100        /**
101         * Constructs a stateful RangeSubscription that emits signals to the given
102         * downstream from an integer range of [start, end).
103         * @param downstream the Subscriber receiving the integer values and the completion signal.
104         * @param start the first integer value emitted, start of the range
105         * @param end the end of the range, exclusive
106         */
107        RangeSubscription(Subscriber<? super Integer> downstream, int start, int end) {
108            this.downstream = downstream;
109            this.index = start;
110            this.end = end;
111        }
112
113        // This method will register inbound demand from our `Subscriber` and
114        // validate it against rule 3.9 and rule 3.17
115        @Override
116        public void request(long n) {
117            // Non-positive requests should be honored with IllegalArgumentException
118            if (n <= 0L) {
119                invalidRequest = new IllegalArgumentException("ยง3.9: non-positive requests are not allowed!");
120                n = 1;
121            }
122            // Downstream requests are cumulative and may come from any thread
123            for (;;) {
124                long requested = get();
125                long update = requested + n;
126                // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE`
127                // we treat the signalled demand as "effectively unbounded"
128                if (update < 0L) {
129                    update = Long.MAX_VALUE;
130                }
131                // atomically update the current requested amount
132                if (compareAndSet(requested, update)) {
133                    // if there was no prior request amount, we start the emission loop
134                    if (requested == 0L) {
135                        emit(update);
136                    }
137                    break;
138                }
139            }
140        }
141
142        // This handles cancellation requests, and is idempotent, thread-safe and not
143        // synchronously performing heavy computations as specified in rule 3.5
144        @Override
145        public void cancel() {
146            // Indicate to the emission loop it should stop.
147            cancelled = true;
148        }
149
150        void emit(long currentRequested) {
151            // Load fields to avoid re-reading them from memory due to volatile accesses in the loop.
152            Subscriber<? super Integer> downstream = this.downstream;
153            int index = this.index;
154            int end = this.end;
155            int emitted = 0;
156
157            try {
158                for (; ; ) {
159                    // Check if there was an invalid request and then report its exception
160                    // as mandated by rule 3.9. The stacktrace in it should
161                    // help locate the faulty logic in the Subscriber.
162                    Throwable invalidRequest = this.invalidRequest;
163                    if (invalidRequest != null) {
164                        // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
165                        cancelled = true;
166
167                        downstream.onError(invalidRequest);
168                        return;
169                    }
170
171                    // Loop while the index hasn't reached the end and we haven't
172                    // emitted all that's been requested
173                    while (index != end && emitted != currentRequested) {
174                        // to make sure that we follow rule 1.8, 3.6 and 3.7
175                        // We stop if cancellation was requested.
176                        if (cancelled) {
177                            return;
178                        }
179
180                        downstream.onNext(index);
181
182                        // Increment the index for the next possible emission.
183                        index++;
184                        // Increment the emitted count to prevent overflowing the downstream.
185                        emitted++;
186                    }
187
188                    // If the index reached the end, we complete the downstream.
189                    if (index == end) {
190                        // to make sure that we follow rule 1.8, 3.6 and 3.7
191                        // Unless cancellation was requested by the last onNext.
192                        if (!cancelled) {
193                            // We need to consider this `Subscription` as cancelled as per rule 1.6
194                            // Note, however, that this state is not observable from the outside
195                            // world and since we leave the loop with requested > 0L, any
196                            // further request() will never trigger the loop.
197                            cancelled = true;
198
199                            downstream.onComplete();
200                        }
201                        return;
202                    }
203
204                    // Did the requested amount change while we were looping?
205                    long freshRequested = get();
206                    if (freshRequested == currentRequested) {
207                        // Save where the loop has left off: the next value to be emitted
208                        this.index = index;
209                        // Atomically subtract the previously requested (also emitted) amount
210                        currentRequested = addAndGet(-currentRequested);
211                        // If there was no new request in between get() and addAndGet(), we simply quit
212                        // The next 0 to N transition in request() will trigger the next emission loop.
213                        if (currentRequested == 0L) {
214                            break;
215                        }
216                        // Looks like there were more async requests, reset the emitted count and continue.
217                        emitted = 0;
218                    } else {
219                        // Yes, avoid the atomic subtraction and resume.
220                        // emitted != currentRequest in this case and index
221                        // still points to the next value to be emitted
222                        currentRequested = freshRequested;
223                    }
224                }
225            } catch (Throwable ex) {
226                // We can only get here if `onNext`, `onError` or `onComplete` threw, and they
227                // are not allowed to according to 2.13, so we can only cancel and log here.
228                // If `onError` throws an exception, this is a spec violation according to rule 1.9,
229                // and all we can do is to log it.
230
231                // Make sure that we are cancelled, since we cannot do anything else
232                // since the `Subscriber` is faulty.
233                cancelled = true;
234
235                // We can't report the failure to onError as the Subscriber is unreliable.
236                (new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " +
237                        "throwing an exception from onNext, onError or onComplete.", ex))
238                        .printStackTrace();
239            }
240        }
241    }
242}