001/***************************************************
002 * Licensed under MIT No Attribution (SPDX: MIT-0) *
003 ***************************************************/
004
005package org.reactivestreams.example.unicast;
006
007import org.reactivestreams.*;
008
009import java.util.concurrent.atomic.AtomicLong;
010
011/**
012 * A synchronous implementation of the {@link Publisher} that can
013 * be subscribed to multiple times and each individual subscription
014 * will receive range of monotonically increasing integer values on demand.
015 */
016public final class RangePublisher implements Publisher<Integer> {
017
018    /** The starting value of the range. */
019    final int start;
020
021    /** The number of items to emit. */
022    final int count;
023
024    /**
025     * Constructs a RangePublisher instance with the given start and count values
026     * that yields a sequence of [start, start + count).
027     * @param start the starting value of the range
028     * @param count the number of items to emit
029     */
030    public RangePublisher(int start, int count) {
031        this.start = start;
032        this.count = count;
033    }
034
035    @Override
036    public void subscribe(Subscriber<? super Integer> subscriber) {
037        // As per rule 1.11, we have decided to support multiple subscribers
038        // in a unicast configuration for this `Publisher` implementation.
039
040        // As per rule 1.09, we need to throw a `java.lang.NullPointerException`
041        // if the `Subscriber` is `null`
042        if (subscriber == null) throw null;
043
044        // As per 2.13, this method must return normally (i.e. not throw).
045        try {
046            subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count));
047        } catch (Throwable ex) {
048            new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " +
049                    "by throwing an exception from onSubscribe.", ex)
050                    // When onSubscribe fails this way, we don't know what state the
051                    // subscriber is thus calling onError may cause more crashes.
052                    .printStackTrace();
053        }
054    }
055
056    /**
057     * A Subscription implementation that holds the current downstream
058     * requested amount and responds to the downstream's request() and
059     * cancel() calls.
060     */
061    static final class RangeSubscription
062            // We are using this `AtomicLong` to make sure that this `Subscription`
063            // doesn't run concurrently with itself, which would violate rule 1.3
064            // among others (no concurrent notifications).
065            // The atomic transition from 0L to N > 0L will ensure this.
066            extends AtomicLong implements Subscription {
067
068        private static final long serialVersionUID = -9000845542177067735L;
069
070        /** The Subscriber we are emitting integer values to. */
071        final Subscriber<? super Integer> downstream;
072
073        /** The end index (exclusive). */
074        final int end;
075
076        /**
077         * The current index and within the [start, start + count) range that
078         * will be emitted as downstream.onNext().
079         */
080        int index;
081
082        /**
083         * Indicates the emission should stop.
084         */
085        volatile boolean cancelled;
086
087        /**
088         * Holds onto the IllegalArgumentException (containing the offending stacktrace)
089         * indicating there was a non-positive request() call from the downstream.
090         */
091        volatile Throwable invalidRequest;
092
093        /**
094         * Constructs a stateful RangeSubscription that emits signals to the given
095         * downstream from an integer range of [start, end).
096         * @param downstream the Subscriber receiving the integer values and the completion signal.
097         * @param start the first integer value emitted, start of the range
098         * @param end the end of the range, exclusive
099         */
100        RangeSubscription(Subscriber<? super Integer> downstream, int start, int end) {
101            this.downstream = downstream;
102            this.index = start;
103            this.end = end;
104        }
105
106        // This method will register inbound demand from our `Subscriber` and
107        // validate it against rule 3.9 and rule 3.17
108        @Override
109        public void request(long n) {
110            // Non-positive requests should be honored with IllegalArgumentException
111            if (n <= 0L) {
112                invalidRequest = new IllegalArgumentException("ยง3.9: non-positive requests are not allowed!");
113                n = 1;
114            }
115            // Downstream requests are cumulative and may come from any thread
116            for (;;) {
117                long requested = get();
118                long update = requested + n;
119                // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE`
120                // we treat the signalled demand as "effectively unbounded"
121                if (update < 0L) {
122                    update = Long.MAX_VALUE;
123                }
124                // atomically update the current requested amount
125                if (compareAndSet(requested, update)) {
126                    // if there was no prior request amount, we start the emission loop
127                    if (requested == 0L) {
128                        emit(update);
129                    }
130                    break;
131                }
132            }
133        }
134
135        // This handles cancellation requests, and is idempotent, thread-safe and not
136        // synchronously performing heavy computations as specified in rule 3.5
137        @Override
138        public void cancel() {
139            // Indicate to the emission loop it should stop.
140            cancelled = true;
141        }
142
143        void emit(long currentRequested) {
144            // Load fields to avoid re-reading them from memory due to volatile accesses in the loop.
145            Subscriber<? super Integer> downstream = this.downstream;
146            int index = this.index;
147            int end = this.end;
148            int emitted = 0;
149
150            try {
151                for (; ; ) {
152                    // Check if there was an invalid request and then report its exception
153                    // as mandated by rule 3.9. The stacktrace in it should
154                    // help locate the faulty logic in the Subscriber.
155                    Throwable invalidRequest = this.invalidRequest;
156                    if (invalidRequest != null) {
157                        // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
158                        cancelled = true;
159
160                        downstream.onError(invalidRequest);
161                        return;
162                    }
163
164                    // Loop while the index hasn't reached the end and we haven't
165                    // emitted all that's been requested
166                    while (index != end && emitted != currentRequested) {
167                        // to make sure that we follow rule 1.8, 3.6 and 3.7
168                        // We stop if cancellation was requested.
169                        if (cancelled) {
170                            return;
171                        }
172
173                        downstream.onNext(index);
174
175                        // Increment the index for the next possible emission.
176                        index++;
177                        // Increment the emitted count to prevent overflowing the downstream.
178                        emitted++;
179                    }
180
181                    // If the index reached the end, we complete the downstream.
182                    if (index == end) {
183                        // to make sure that we follow rule 1.8, 3.6 and 3.7
184                        // Unless cancellation was requested by the last onNext.
185                        if (!cancelled) {
186                            // We need to consider this `Subscription` as cancelled as per rule 1.6
187                            // Note, however, that this state is not observable from the outside
188                            // world and since we leave the loop with requested > 0L, any
189                            // further request() will never trigger the loop.
190                            cancelled = true;
191
192                            downstream.onComplete();
193                        }
194                        return;
195                    }
196
197                    // Did the requested amount change while we were looping?
198                    long freshRequested = get();
199                    if (freshRequested == currentRequested) {
200                        // Save where the loop has left off: the next value to be emitted
201                        this.index = index;
202                        // Atomically subtract the previously requested (also emitted) amount
203                        currentRequested = addAndGet(-currentRequested);
204                        // If there was no new request in between get() and addAndGet(), we simply quit
205                        // The next 0 to N transition in request() will trigger the next emission loop.
206                        if (currentRequested == 0L) {
207                            break;
208                        }
209                        // Looks like there were more async requests, reset the emitted count and continue.
210                        emitted = 0;
211                    } else {
212                        // Yes, avoid the atomic subtraction and resume.
213                        // emitted != currentRequest in this case and index
214                        // still points to the next value to be emitted
215                        currentRequested = freshRequested;
216                    }
217                }
218            } catch (Throwable ex) {
219                // We can only get here if `onNext`, `onError` or `onComplete` threw, and they
220                // are not allowed to according to 2.13, so we can only cancel and log here.
221                // If `onError` throws an exception, this is a spec violation according to rule 1.9,
222                // and all we can do is to log it.
223
224                // Make sure that we are cancelled, since we cannot do anything else
225                // since the `Subscriber` is faulty.
226                cancelled = true;
227
228                // We can't report the failure to onError as the Subscriber is unreliable.
229                (new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " +
230                        "throwing an exception from onNext, onError or onComplete.", ex))
231                        .printStackTrace();
232            }
233        }
234    }
235}