001/************************************************************************ 002* Licensed under Public Domain (CC0) * 003* * 004* To the extent possible under law, the person who associated CC0 with * 005* this code has waived all copyright and related or neighboring * 006* rights to this code. * 007* * 008* You should have received a copy of the CC0 legalcode along with this * 009* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* 010************************************************************************/ 011 012package org.reactivestreams.example.unicast; 013 014import org.reactivestreams.Subscriber; 015import org.reactivestreams.Subscription; 016 017/** 018 * SyncSubscriber is an implementation of Reactive Streams `Subscriber`, 019 * it runs synchronously (on the Publisher's thread) and requests one element 020 * at a time and invokes a user-defined method to process each element. 021 * 022 * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. 023 */ 024public abstract class SyncSubscriber<T> implements Subscriber<T> { 025 private Subscription subscription; // Obeying rule 3.1, we make this private! 026 private boolean done = false; 027 028 @Override public void onSubscribe(final Subscription s) { 029 // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` 030 if (s == null) throw null; 031 032 if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully 033 try { 034 s.cancel(); // Cancel the additional subscription 035 } catch(final Throwable t) { 036 //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 037 (new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); 038 } 039 } else { 040 // We have to assign it locally before we use it, if we want to be a synchronous `Subscriber` 041 // Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request` 042 subscription = s; 043 try { 044 // If we want elements, according to rule 2.1 we need to call `request` 045 // And, according to rule 3.2 we are allowed to call this synchronously from within the `onSubscribe` method 046 s.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time 047 } catch(final Throwable t) { 048 // Subscription.request is not allowed to throw according to rule 3.16 049 (new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err); 050 } 051 } 052 } 053 054 @Override public void onNext(final T element) { 055 if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec 056 (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err); 057 } else { 058 // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null` 059 if (element == null) throw null; 060 061 if (!done) { // If we aren't already done 062 try { 063 if (whenNext(element)) { 064 try { 065 subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time 066 } catch (final Throwable t) { 067 // Subscription.request is not allowed to throw according to rule 3.16 068 (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err); 069 } 070 } else { 071 done(); 072 } 073 } catch (final Throwable t) { 074 done(); 075 try { 076 onError(t); 077 } catch (final Throwable t2) { 078 //Subscriber.onError is not allowed to throw an exception, according to rule 2.13 079 (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); 080 } 081 } 082 } 083 } 084 } 085 086 // Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements 087 // herefor we also need to cancel our `Subscription`. 088 private void done() { 089 //On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to. 090 done = true; // If we `whenNext` throws an exception, let's consider ourselves done (not accepting more elements) 091 try { 092 subscription.cancel(); // Cancel the subscription 093 } catch(final Throwable t) { 094 //Subscription.cancel is not allowed to throw an exception, according to rule 3.15 095 (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err); 096 } 097 } 098 099 // This method is left as an exercise to the reader/extension point 100 // Returns whether more elements are desired or not, and if no more elements are desired 101 protected abstract boolean whenNext(final T element); 102 103 @Override public void onError(final Throwable t) { 104 if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec 105 (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err); 106 } else { 107 // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null` 108 if (t == null) throw null; 109 // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 110 // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 111 } 112 } 113 114 @Override public void onComplete() { 115 if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec 116 (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err); 117 } else { 118 // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3 119 // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4 120 } 121 } 122}