001/************************************************************************ 002 * Licensed under Public Domain (CC0) * 003 * * 004 * To the extent possible under law, the person who associated CC0 with * 005 * this code has waived all copyright and related or neighboring * 006 * rights to this code. * 007 * * 008 * You should have received a copy of the CC0 legalcode along with this * 009 * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010 ************************************************************************/ 011 012package org.reactivestreams.example.unicast; 013 014import org.reactivestreams.*; 015 016import java.util.concurrent.atomic.AtomicLong; 017 018/** 019 * A synchronous implementation of the {@link Publisher} that can 020 * be subscribed to multiple times and each individual subscription 021 * will receive range of monotonically increasing integer values on demand. 022 */ 023public final class RangePublisher implements Publisher<Integer> { 024 025 /** The starting value of the range. */ 026 final int start; 027 028 /** The number of items to emit. */ 029 final int count; 030 031 /** 032 * Constructs a RangePublisher instance with the given start and count values 033 * that yields a sequence of [start, start + count). 034 * @param start the starting value of the range 035 * @param count the number of items to emit 036 */ 037 public RangePublisher(int start, int count) { 038 this.start = start; 039 this.count = count; 040 } 041 042 @Override 043 public void subscribe(Subscriber<? super Integer> subscriber) { 044 // As per rule 1.11, we have decided to support multiple subscribers 045 // in a unicast configuration for this `Publisher` implementation. 046 047 // As per rule 1.09, we need to throw a `java.lang.NullPointerException` 048 // if the `Subscriber` is `null` 049 if (subscriber == null) throw null; 050 051 // As per 2.13, this method must return normally (i.e. not throw). 052 try { 053 subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count)); 054 } catch (Throwable ex) { 055 new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " + 056 "by throwing an exception from onSubscribe.", ex) 057 // When onSubscribe fails this way, we don't know what state the 058 // subscriber is thus calling onError may cause more crashes. 059 .printStackTrace(); 060 } 061 } 062 063 /** 064 * A Subscription implementation that holds the current downstream 065 * requested amount and responds to the downstream's request() and 066 * cancel() calls. 067 */ 068 static final class RangeSubscription 069 // We are using this `AtomicLong` to make sure that this `Subscription` 070 // doesn't run concurrently with itself, which would violate rule 1.3 071 // among others (no concurrent notifications). 072 // The atomic transition from 0L to N > 0L will ensure this. 073 extends AtomicLong implements Subscription { 074 075 private static final long serialVersionUID = -9000845542177067735L; 076 077 /** The Subscriber we are emitting integer values to. */ 078 final Subscriber<? super Integer> downstream; 079 080 /** The end index (exclusive). */ 081 final int end; 082 083 /** 084 * The current index and within the [start, start + count) range that 085 * will be emitted as downstream.onNext(). 086 */ 087 int index; 088 089 /** 090 * Indicates the emission should stop. 091 */ 092 volatile boolean cancelled; 093 094 /** 095 * Holds onto the IllegalArgumentException (containing the offending stacktrace) 096 * indicating there was a non-positive request() call from the downstream. 097 */ 098 volatile Throwable invalidRequest; 099 100 /** 101 * Constructs a stateful RangeSubscription that emits signals to the given 102 * downstream from an integer range of [start, end). 103 * @param downstream the Subscriber receiving the integer values and the completion signal. 104 * @param start the first integer value emitted, start of the range 105 * @param end the end of the range, exclusive 106 */ 107 RangeSubscription(Subscriber<? super Integer> downstream, int start, int end) { 108 this.downstream = downstream; 109 this.index = start; 110 this.end = end; 111 } 112 113 // This method will register inbound demand from our `Subscriber` and 114 // validate it against rule 3.9 and rule 3.17 115 @Override 116 public void request(long n) { 117 // Non-positive requests should be honored with IllegalArgumentException 118 if (n <= 0L) { 119 invalidRequest = new IllegalArgumentException("ยง3.9: non-positive requests are not allowed!"); 120 n = 1; 121 } 122 // Downstream requests are cumulative and may come from any thread 123 for (;;) { 124 long requested = get(); 125 long update = requested + n; 126 // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` 127 // we treat the signalled demand as "effectively unbounded" 128 if (update < 0L) { 129 update = Long.MAX_VALUE; 130 } 131 // atomically update the current requested amount 132 if (compareAndSet(requested, update)) { 133 // if there was no prior request amount, we start the emission loop 134 if (requested == 0L) { 135 emit(update); 136 } 137 break; 138 } 139 } 140 } 141 142 // This handles cancellation requests, and is idempotent, thread-safe and not 143 // synchronously performing heavy computations as specified in rule 3.5 144 @Override 145 public void cancel() { 146 // Indicate to the emission loop it should stop. 147 cancelled = true; 148 } 149 150 void emit(long currentRequested) { 151 // Load fields to avoid re-reading them from memory due to volatile accesses in the loop. 152 Subscriber<? super Integer> downstream = this.downstream; 153 int index = this.index; 154 int end = this.end; 155 int emitted = 0; 156 157 try { 158 for (; ; ) { 159 // Check if there was an invalid request and then report its exception 160 // as mandated by rule 3.9. The stacktrace in it should 161 // help locate the faulty logic in the Subscriber. 162 Throwable invalidRequest = this.invalidRequest; 163 if (invalidRequest != null) { 164 // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 165 cancelled = true; 166 167 downstream.onError(invalidRequest); 168 return; 169 } 170 171 // Loop while the index hasn't reached the end and we haven't 172 // emitted all that's been requested 173 while (index != end && emitted != currentRequested) { 174 // to make sure that we follow rule 1.8, 3.6 and 3.7 175 // We stop if cancellation was requested. 176 if (cancelled) { 177 return; 178 } 179 180 downstream.onNext(index); 181 182 // Increment the index for the next possible emission. 183 index++; 184 // Increment the emitted count to prevent overflowing the downstream. 185 emitted++; 186 } 187 188 // If the index reached the end, we complete the downstream. 189 if (index == end) { 190 // to make sure that we follow rule 1.8, 3.6 and 3.7 191 // Unless cancellation was requested by the last onNext. 192 if (!cancelled) { 193 // We need to consider this `Subscription` as cancelled as per rule 1.6 194 // Note, however, that this state is not observable from the outside 195 // world and since we leave the loop with requested > 0L, any 196 // further request() will never trigger the loop. 197 cancelled = true; 198 199 downstream.onComplete(); 200 } 201 return; 202 } 203 204 // Did the requested amount change while we were looping? 205 long freshRequested = get(); 206 if (freshRequested == currentRequested) { 207 // Save where the loop has left off: the next value to be emitted 208 this.index = index; 209 // Atomically subtract the previously requested (also emitted) amount 210 currentRequested = addAndGet(-currentRequested); 211 // If there was no new request in between get() and addAndGet(), we simply quit 212 // The next 0 to N transition in request() will trigger the next emission loop. 213 if (currentRequested == 0L) { 214 break; 215 } 216 // Looks like there were more async requests, reset the emitted count and continue. 217 emitted = 0; 218 } else { 219 // Yes, avoid the atomic subtraction and resume. 220 // emitted != currentRequest in this case and index 221 // still points to the next value to be emitted 222 currentRequested = freshRequested; 223 } 224 } 225 } catch (Throwable ex) { 226 // We can only get here if `onNext`, `onError` or `onComplete` threw, and they 227 // are not allowed to according to 2.13, so we can only cancel and log here. 228 // If `onError` throws an exception, this is a spec violation according to rule 1.9, 229 // and all we can do is to log it. 230 231 // Make sure that we are cancelled, since we cannot do anything else 232 // since the `Subscriber` is faulty. 233 cancelled = true; 234 235 // We can't report the failure to onError as the Subscriber is unreliable. 236 (new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " + 237 "throwing an exception from onNext, onError or onComplete.", ex)) 238 .printStackTrace(); 239 } 240 } 241 } 242}