PublishSubject source code analysis
Copyright 2014 Netflix, Inc. ** Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package rx.subjects;
import java.util.*;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.Observer;
import rx.exceptions.*;
import rx.internal.operators.BackpressureUtils;
/**
* Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the
* subscriber.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
* <p>
* Example usage:
* <p>
* <pre> {@code
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();
} </pre>
*
* @param <T>
* the type of items observed and emitted by the Subject
*/
public final class PublishSubject<T> extends Subject<T.T> {
final PublishSubjectState<T> state;
/**
* Creates and returns a new {@code PublishSubject}.
*
* @param <T> the value type
* @return the new {@code PublishSubject}
*/
public static <T> PublishSubject<T> create(a) {
return new PublishSubject<T>(new PublishSubjectState<T>());
}
protected PublishSubject(PublishSubjectState<T> state) {
super(state);
this.state = state;
}
@Override
public void onNext(T v) {
state.onNext(v);
}
@Override
public void onError(Throwable e) {
state.onError(e);
}
@Override
public void onCompleted(a) {
state.onCompleted();
}
@Override
public boolean hasObservers(a) {
returnstate.get().length ! =0;
}
/**
* Check if the Subject has terminated with an exception.
* @return true if the subject has received a throwable through {@code onError}.
* @since1.2 * /
public boolean hasThrowable(a) {
returnstate.get() == PublishSubjectState.TERMINATED && state.error ! =null;
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted}
* @since1.2 * /
public boolean hasCompleted(a) {
return state.get() == PublishSubjectState.TERMINATED && state.error == null;
}
/**
* Returns the Throwable that terminated the Subject.
* @return the Throwable that terminated the Subject or {@code null} if the
* subject hasn't terminated yet or it terminated normally.
* @since1.2 * /
public Throwable getThrowable(a) {
if (state.get() == PublishSubjectState.TERMINATED) {
return state.error;
}
return null;
}
static final class PublishSubjectState<T>
extends AtomicReference<PublishSubjectProducer<T> > []implements OnSubscribe<T>, Observer<T> {
/ * * * /
private static final long serialVersionUID = -7568940796666027140L;
@SuppressWarnings("rawtypes")
static final PublishSubjectProducer[] EMPTY = new PublishSubjectProducer[0];
@SuppressWarnings("rawtypes")
static final PublishSubjectProducer[] TERMINATED = new PublishSubjectProducer[0];
Throwable error;
@SuppressWarnings("unchecked")
public PublishSubjectState(a) {
lazySet(EMPTY);
}
@Override
public void call(Subscriber<? super T> t) {
PublishSubjectProducer<T> pp = new PublishSubjectProducer<T>(this, t);
t.add(pp);
t.setProducer(pp);
if (add(pp)) {
if(pp.isUnsubscribed()) { remove(pp); }}else {
Throwable ex = error;
if(ex ! =null) {
t.onError(ex);
} else{ t.onCompleted(); }}}boolean add(PublishSubjectProducer<T> inner) {
for (;;) {
PublishSubjectProducer<T>[] curr = get();
if (curr == TERMINATED) {
return false;
}
int n = curr.length;
@SuppressWarnings("unchecked")
PublishSubjectProducer<T>[] next = new PublishSubjectProducer[n + 1];
System.arraycopy(curr, 0, next, 0, n);
next[n] = inner;
if (compareAndSet(curr, next)) {
return true; }}}@SuppressWarnings("unchecked")
void remove(PublishSubjectProducer<T> inner) {
for (;;) {
PublishSubjectProducer<T>[] curr = get();
if (curr == TERMINATED || curr == EMPTY) {
return;
}
int n = curr.length;
int j = -1;
for (int i = 0; i < n; i++) {
if (curr[i] == inner) {
j = i;
break; }}if (j < 0) {
return;
}
PublishSubjectProducer<T>[] next;
if (n == 1) {
next = EMPTY;
} else {
next = new PublishSubjectProducer[n - 1];
System.arraycopy(curr, 0, next, 0, j);
System.arraycopy(curr, j + 1, next, j, n - j - 1);
}
if (compareAndSet(curr, next)) {
return; }}}@Override
public void onNext(T t) {
for(PublishSubjectProducer<T> pp : get()) { pp.onNext(t); }}@SuppressWarnings("unchecked")
@Override
public void onError(Throwable e) {
error = e;
List<Throwable> errors = null;
for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
try {
pp.onError(e);
} catch (Throwable ex) {
if (errors == null) {
errors = new ArrayList<Throwable>(1);
}
errors.add(ex);
}
}
Exceptions.throwIfAny(errors);
}
@SuppressWarnings("unchecked")
@Override
public void onCompleted(a) {
for(PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) { pp.onCompleted(); }}}static final class PublishSubjectProducer<T>
extends AtomicLong
implements Producer.Subscription.Observer<T> {
/ * * * /
private static final long serialVersionUID = 6451806817170721536L;
final PublishSubjectState<T> parent;
final Subscriber<? super T> actual;
long produced;
public PublishSubjectProducer(PublishSubjectState<T> parent, Subscriber<? super T> actual) {
this.parent = parent;
this.actual = actual;
}
@Override
public void request(long n) {
if (BackpressureUtils.validate(n)) {
for (;;) {
long r = get();
if (r == Long.MIN_VALUE) {
return;
}
long u = BackpressureUtils.addCap(r, n);
if (compareAndSet(r, u)) {
return; }}}}@Override
public boolean isUnsubscribed(a) {
return get() == Long.MIN_VALUE;
}
@Override
public void unsubscribe(a) {
if(getAndSet(Long.MIN_VALUE) ! = Long.MIN_VALUE) { parent.remove(this); }}@Override
public void onNext(T t) {
long r = get();
if(r ! = Long.MIN_VALUE) {long p = produced;
if(r ! = p) { produced = p +1;
actual.onNext(t);
} else {
unsubscribe();
actual.onError(new MissingBackpressureException("PublishSubject: could not emit value due to lack of requests")); }}}@Override
public void onError(Throwable e) {
if (get() != Long.MIN_VALUE) {
actual.onError(e);
}
}
@Override
public void onCompleted(a) {
if(get() ! = Long.MIN_VALUE) { actual.onCompleted(); }}}}Copy the code
Observable: publishsubject.create ();
Step 2: In the Create() method of the PublishSubject class, the observer manager PublishSubjectProducer is created
Step 3: Create a New PublishSubject(state,state) object in the Create method of the PublishSubject and pass in the manager
Step 4: Create a new PublishSubject(state,state) object in the.call method of the PublishSubject and pass the manager to step 5: The new PublishSubject(state,state) object is created in the call() method and the manager is passed in
BehaviorSubject use
private BehaviorSubject<String> behaviorSubject;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
// I send a message by default
behaviorSubject = BehaviorSubject.create("Send default message");
}
public void click(View v) {
behaviorSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {
Log.i("main"."Do...");
}
@Override
public void onError(Throwable e) {
Log.i("main"."Abnormal...");
}
@Override
public void onNext(String message) {
Log.i("main"."News:"+ message); }}); }public void send(View v) {
behaviorSubject.onNext("Hello, hold on.....");
}
Copy the code
Result output:
08-03 10:26:07. 037, 18544-18544 / com haocai. Architect. Rxjava I/main: message: send message by defaultCopy the code
BehaviorSubject source
public final class BehaviorSubject<T> extends Subject<T.T> {
/** An empty array to trigger getValues() to return a new array. */
private static final Object[] EMPTY_ARRAY = new Object[0];
private final SubjectSubscriptionManager<T> state;
/**
* Creates a {@link BehaviorSubject} without a default item.
*
* @param <T>
* the type of item the Subject will emit
* @return the constructed {@link BehaviorSubject}
*/
public static <T> BehaviorSubject<T> create(a) {
return create(null.false);
}
/**
* Creates a {@link BehaviorSubject} that emits the last item it observed and all subsequent items to each
* {@link Observer} that subscribes to it.
*
* @param <T>
* the type of item the Subject will emit
* @param defaultValue
* the item that will be emitted first to any {@link Observer} as long as the
* {@link BehaviorSubject} has not yet observed any items from its source {@code Observable}
* @return the constructed {@link BehaviorSubject}
*/
public static <T> BehaviorSubject<T> create(T defaultValue) {
return create(defaultValue, true);
}
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
if (hasDefault) {
state.setLatest(NotificationLite.next(defaultValue));
}
state.onAdded = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) { o.emitFirst(state.getLatest()); }}; state.onTerminated = state.onAdded;return new BehaviorSubject<T>(state, state);
}
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
super(onSubscribe);
this.state = state;
}
@Override
public void onCompleted(a) {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = NotificationLite.completed();
for(SubjectObserver<T> bo : state.terminate(n)) { bo.emitNext(n); }}}@Override
public void onError(Throwable e) {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = NotificationLite.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
try {
bo.emitNext(n);
} catch (Throwable e2) {
if (errors == null) {
errors = newArrayList<Throwable>(); } errors.add(e2); } } Exceptions.throwIfAny(errors); }}@Override
public void onNext(T v) {
Object last = state.getLatest();
if (last == null || state.active) {
Object n = NotificationLite.next(v);
for(SubjectObserver<T> bo : state.next(n)) { bo.emitNext(n); }}}/* test support */ int subscriberCount(a) {
return state.observers().length;
}
@Override
public boolean hasObservers(a) {
return state.observers().length > 0;
}
/**
* Check if the Subject has a value.
* <p>Use the {@link #getValue()} method to retrieve such a value.
* <p>Note that unless {@link #hasCompleted()} or {@link #hasThrowable()} returns true, the value
* retrieved by {@code getValue()} may get outdated.
* @return true if and only if the subject has some value and hasn't terminated yet.
* @since1.2 * /
public boolean hasValue(a) {
Object o = state.getLatest();
return NotificationLite.isNext(o);
}
/**
* Check if the Subject has terminated with an exception.
* @return true if the subject has received a throwable through {@code onError}.
* @since1.2 * /
public boolean hasThrowable(a) {
Object o = state.getLatest();
return NotificationLite.isError(o);
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted()}
* @since1.2 * /
public boolean hasCompleted(a) {
Object o = state.getLatest();
return NotificationLite.isCompleted(o);
}
/**
* Returns the current value of the Subject if there is such a value and
* the subject hasn't terminated yet.
* <p>The method can return {@code null} for various reasons. Use {@link #hasValue()}, {@link #hasThrowable()}
* and {@link #hasCompleted()} to determine if such {@code null} is a valid value, there was an
* exception or the Subject terminated (with or without receiving any value).
* @return the current value or {@code null} if the Subject doesn't have a value,
* has terminated or has an actual {@code null} as a valid value.
* @since1.2 * /
public T getValue(a) {
Object o = state.getLatest();
if (NotificationLite.isNext(o)) {
return NotificationLite.getValue(o);
}
return null;
}
/**
* Returns the Throwable that terminated the Subject.
* @return the Throwable that terminated the Subject or {@code null} if the
* subject hasn't terminated yet or it terminated normally.
* @since1.2 * /
public Throwable getThrowable(a) {
Object o = state.getLatest();
if (NotificationLite.isError(o)) {
return NotificationLite.getError(o);
}
return null;
}
/**
* Returns a snapshot of the currently buffered non-terminal events into
* the provided {@code a} array or creates a new array if it has not enough capacity.
* @param a the array to fill in
* @return the array {@code a} if it had enough capacity or a new array containing the available values
* @since1.2 * /
@SuppressWarnings("unchecked")
public T[] getValues(T[] a) {
Object o = state.getLatest();
if (NotificationLite.isNext(o)) {
if (a.length == 0) {
a = (T[])Array.newInstance(a.getClass().getComponentType(), 1);
}
a[0] = NotificationLite.getValue(o);
if (a.length > 1) {
a[1] = null; }}else
if (a.length > 0) {
a[0] = null;
}
return a;
}
/**
* Returns a snapshot of the currently buffered non-terminal events.
* <p>The operation is thread-safe.
*
* @return a snapshot of the currently buffered non-terminal events.
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
* @since1.2 * /
@SuppressWarnings("unchecked")
public Object[] getValues() {
T[] r = getValues((T[])EMPTY_ARRAY);
if (r == EMPTY_ARRAY) {
return new Object[0]; // don't leak the default empty array.
}
returnr; }}Copy the code
ReplaySubject basic use
private ReplaySubject<String> replaySubject;
private EditText editText;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
// Caches all messages. Once a new observer subscribes, all cached messages are immediately sent to the corresponding observer
replaySubject = ReplaySubject.create();
}
public void click(View v) {
replaySubject.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {
Log.i("main"."Do...");
}
@Override
public void onError(Throwable e) {
Log.i("main"."Abnormal...");
}
@Override
public void onNext(String message) {
Log.i("main"."News:"+ message); }}); }public void send(View v) {
if(editText==null){
editText = (EditText) findViewById(R.id.et_text);
}
replaySubject.onNext(editText.getText().toString());
}
Copy the code
Basic use of AsyncSubject
private AsyncSubject<String> asyncSubject;
private EditText editText;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple2);
/ / summary
// When we send a message, we call onNext. We must also call onComplete() to commit
// The message must be sent, otherwise the observer will not receive the message
If AsyncSubject is used to create and add registered observers, all observers will receive only the latest message
asyncSubject = AsyncSubject.create();
}
public void click(View v) {
asyncSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {
Log.i("main"."Do...");
}
@Override
public void onError(Throwable e) {
Log.i("main"."Abnormal...");
}
@Override
public void onNext(String message) {
Log.i("main"."News:"+ message); }}); }public void send(View v) {
// if(editText==null){
// editText = (EditText) findViewById(R.id.et_text);
/ /}
asyncSubject.onNext("hello world");
asyncSubject.onNext("qwert");
asyncSubject.onNext("asdfg");
asyncSubject.onNext("zxcvb");
asyncSubject.onCompleted();
}
Copy the code
Result output:
08-03 14:33:56. 457, 15968-15968 / com haocai. Architect. Rxjava I/main: News: ZXCVB 08-03 14:33:56. 457. 15968-15968 / com haocai. Architect. Rxjava I/main: complete...Copy the code
AsyncSubject source code analysis
public final class AsyncSubject<T> extends Subject<T.T> {
final SubjectSubscriptionManager<T> state;
volatile Object lastValue;
/**
* Creates and returns a new {@code AsyncSubject}.
* @param <T> the result value type
* @return the new {@code AsyncSubject}
*/
public static <T> AsyncSubject<T> create(a) {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
state.onTerminated = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
Object v = state.getLatest();
if (v == null || NotificationLite.isCompleted(v)) {
o.onCompleted();
} else
if (NotificationLite.isError(v)) {
o.onError(NotificationLite.getError(v));
} else {
o.actual.setProducer(newSingleProducer<T>(o.actual, NotificationLite.<T>getValue(v))); }}};return new AsyncSubject<T>(state, state);
}
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
super(onSubscribe);
this.state = state;
}
@Override
public void onCompleted(a) {
if (state.active) {
Object last = lastValue;
if (last == null) {
last = NotificationLite.completed();
}
for (SubjectObserver<T> bo : state.terminate(last)) {
if (last == NotificationLite.completed()) {
bo.onCompleted();
} else {
bo.actual.setProducer(newSingleProducer<T>(bo.actual, NotificationLite.<T>getValue(last))); }}}}@Override
public void onError(final Throwable e) {
if (state.active) {
Object n = NotificationLite.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
try {
bo.onError(e);
} catch (Throwable e2) {
if (errors == null) {
errors = newArrayList<Throwable>(); } errors.add(e2); } } Exceptions.throwIfAny(errors); }}@Override
public void onNext(T v) {
lastValue = NotificationLite.next(v);
}
@Override
public boolean hasObservers(a) {
return state.observers().length > 0;
}
/**
* Check if the Subject has a value.
* <p>Use the {@link #getValue()} method to retrieve such a value.
* <p>Note that unless {@link #hasCompleted()} or {@link #hasThrowable()} returns true, the value
* retrieved by {@code getValue()} may get outdated.
* @return true if and only if the subject has some value but not an error
* @since1.2 * /
public boolean hasValue(a) {
Object v = lastValue;
Object o = state.getLatest();
return! NotificationLite.isError(o) && NotificationLite.isNext(v); }/**
* Check if the Subject has terminated with an exception.
* @return true if the subject has received a throwable through {@code onError}.
* @since1.2 * /
public boolean hasThrowable(a) {
Object o = state.getLatest();
return NotificationLite.isError(o);
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted()}
* @since1.2 * /
public boolean hasCompleted(a) {
Object o = state.getLatest();
returno ! =null && !NotificationLite.isError(o);
}
/**
* Returns the current value of the Subject if there is such a value and
* the subject hasn't terminated with an exception.
* <p>The method can return {@code null} for various reasons. Use {@link #hasValue()}, {@link #hasThrowable()}
* and {@link #hasCompleted()} to determine if such {@code null} is a valid value, there was an
* exception or the Subject terminated without receiving any value.
* @return the current value or {@code null} if the Subject doesn't have a value,
* has terminated with an exception or has an actual {@code null} as a value.
* @since1.2 * /
public T getValue(a) {
Object v = lastValue;
Object o = state.getLatest();
if(! NotificationLite.isError(o) && NotificationLite.isNext(v)) {return NotificationLite.getValue(v);
}
return null;
}
/**
* Returns the Throwable that terminated the Subject.
* @return the Throwable that terminated the Subject or {@code null} if the
* subject hasn't terminated yet or it terminated normally.
* @since1.2 * /
public Throwable getThrowable(a) {
Object o = state.getLatest();
if (NotificationLite.isError(o)) {
return NotificationLite.getError(o);
}
return null; }}Copy the code
Some other apis in Observable first: the just() method
AppInfo appInfo1 = new AppInfo("Michael classmate".0);
AppInfo appInfo2 = new AppInfo("Michael classmate".0);
AppInfo appInfo3 = new AppInfo("Michael classmate".0);
Observable.just(appInfo1, appInfo2, appInfo3);
/ / the source code
public static <T> Observable<T> just(T t1, T t2, T t3) {
return from((T[])new Object[] { t1, t2, t3 });
}
Copy the code
Single object to array
The second: repeat() method
public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
final Observable<T> source;
private final Func1<? superObservable<? extends Notification<? > >,? extends Observable<? >> controlHandlerFunction;final boolean stopOnComplete;
final boolean stopOnError;
private final Scheduler scheduler;
static finalFunc1<Observable<? extends Notification<? >>, Observable<? >> REDO_INFINITE =newFunc1<Observable<? extends Notification<? >>, Observable<? > > () {@Override
publicObservable<? > call(Observable<? extends Notification<? >> ts) {return ts.map(newFunc1<Notification<? >, Notification<? > > () {@Override
publicNotification<? > call(Notification<? > terminal) {return Notification.createOnNext(null); }}); }};public static final class RedoFinite implements Func1<Observable<? extends Notification<? > >,Observable<? >>{
final long count;
public RedoFinite(long count) {
this.count = count;
}
@Override
publicObservable<? > call(Observable<? extends Notification<? >> ts) {return ts.map(newFunc1<Notification<? >, Notification<? > > () {int num;
@Override
publicNotification<? > call(Notification<? > terminalNotification) {if (count == 0) {
return terminalNotification;
}
num++;
if (num <= count) {
return Notification.createOnNext(num);
} else {
returnterminalNotification; } } }).dematerialize(); }}public static final class RetryWithPredicate implements Func1<Observable<? extends Notification<? > >,Observable<? extends Notification<? >>>{
final Func2<Integer, Throwable, Boolean> predicate;
public RetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
this.predicate = predicate;
}
@Override
publicObservable<? extends Notification<? >> call(Observable<? extends Notification<? >> ts) {return ts.scan(Notification.createOnNext(0), newFunc2<Notification<Integer>, Notification<? >, Notification<Integer>>() {@SuppressWarnings("unchecked")
@Override
public Notification<Integer> call(Notification
n, Notification
term)
{
final int value = n.getValue();
if (predicate.call(value, term.getThrowable())) {
return Notification.createOnNext(value + 1);
} else {
return(Notification<Integer>) term; }}}); }}public static <T> Observable<T> retry(Observable<T> source) {
return retry(source, REDO_INFINITE);
}
public static <T> Observable<T> retry(Observable<T> source, final long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 expected");
}
if (count == 0) {
return source;
}
return retry(source, new RedoFinite(count));
}
public static <T> Observable<T> retry(Observable<T> source, Func1<? superObservable<? extends Notification<? > >,? extends Observable<? >> notificationHandler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true.false, Schedulers.trampoline()));
}
public static <T> Observable<T> retry(Observable<T> source, Func1<? superObservable<? extends Notification<? > >,? extends Observable<? >> notificationHandler, Scheduler scheduler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true.false, scheduler));
}
public static <T> Observable<T> repeat(Observable<T> source) {
return repeat(source, Schedulers.trampoline());
}
public static <T> Observable<T> repeat(Observable<T> source, Scheduler scheduler) {
return repeat(source, REDO_INFINITE, scheduler);
}
public static <T> Observable<T> repeat(Observable<T> source, final long count) {
return repeat(source, count, Schedulers.trampoline());
}
public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
if (count == 0) {
return Observable.empty();
}
if (count < 0) {
throw new IllegalArgumentException("count >= 0 expected");
}
return repeat(source, new RedoFinite(count - 1), scheduler);
}
public static <T> Observable<T> repeat(Observable<T> source, Func1<? superObservable<? extends Notification<? > >,? extends Observable<? >> notificationHandler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false.true, Schedulers.trampoline()));
}
public static <T> Observable<T> repeat(Observable<T> source, Func1<? superObservable<? extends Notification<? > >,? extends Observable<? >> notificationHandler, Scheduler scheduler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false.true, scheduler));
}
public static <T> Observable<T> redo(Observable<T> source, Func1<? superObservable<? extends Notification<? > >,? extends Observable<? >> notificationHandler, Scheduler scheduler) {
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false.false, scheduler));
}
private OnSubscribeRedo(Observable<T> source, Func1<? superObservable<? extends Notification<? > >,? extends Observable<? >> f,boolean stopOnComplete, boolean stopOnError,
Scheduler scheduler) {
this.source = source;
this.controlHandlerFunction = f;
this.stopOnComplete = stopOnComplete;
this.stopOnError = stopOnError;
this.scheduler = scheduler;
}
@Override
public void call(final Subscriber<? super T> child) {
// when true is a marker to say we are ready to resubscribe to source
final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
// incremented when requests are made, decremented when requests are fulfilled
final AtomicLong consumerCapacity = new AtomicLong();
final Scheduler.Worker worker = scheduler.createWorker();
child.add(worker);
final SerialSubscription sourceSubscriptions = new SerialSubscription();
child.add(sourceSubscriptions);
// use a subject to receive terminals (onCompleted and onError signals) from
// the source observable. We use a BehaviorSubject because subscribeToSource
// may emit a terminal before the restarts observable (transformed terminals)
// is subscribed
finalSubject<Notification<? >, Notification<? >> terminals = BehaviorSubject.<Notification<? >>create().toSerialized();finalSubscriber<Notification<? >> dummySubscriber = Subscribers.empty();// subscribe immediately so the last emission will be replayed to the next
// subscriber (which is the one we care about)
terminals.subscribe(dummySubscriber);
final ProducerArbiter arbiter = new ProducerArbiter();
final Action0 subscribeToSource = new Action0() {
@Override
public void call(a) {
if (child.isUnsubscribed()) {
return;
}
Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
boolean done;
@Override
public void onCompleted(a) {
if(! done) { done =true; unsubscribe(); terminals.onNext(Notification.createOnCompleted()); }}@Override
public void onError(Throwable e) {
if(! done) { done =true; unsubscribe(); terminals.onNext(Notification.createOnError(e)); }}@Override
public void onNext(T v) {
if(! done) { child.onNext(v); decrementConsumerCapacity(); arbiter.produced(1); }}private void decrementConsumerCapacity(a) {
// use a CAS loop because we don't want to decrement the
// value if it is Long.MAX_VALUE
while (true) {
long cc = consumerCapacity.get();
if(cc ! = Long.MAX_VALUE) {if (consumerCapacity.compareAndSet(cc, cc - 1)) {
break; }}else {
break; }}}@Override
public void setProducer(Producer producer) { arbiter.setProducer(producer); }};// new subscription each time so if it unsubscribes itself it does not prevent retries
// by unsubscribing the child subscriptionsourceSubscriptions.set(terminalDelegatingSubscriber); source.unsafeSubscribe(terminalDelegatingSubscriber); }};// the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat'
// type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert
// the retry/repeat relevant values to the control handler
finalObservable<? > restarts = controlHandlerFunction.call( terminals.lift(newOperator<Notification<? >, Notification<? > > () {@Override
public Subscriber<? superNotification<? >> call(final Subscriber<? superNotification<? >> filteredTerminals) {return newSubscriber<Notification<? >>(filteredTerminals) {@Override
public void onCompleted(a) {
filteredTerminals.onCompleted();
}
@Override
public void onError(Throwable e) {
filteredTerminals.onError(e);
}
@Override
public void onNext(Notification
t) {
if (t.isOnCompleted() && stopOnComplete) {
filteredTerminals.onCompleted();
} else if (t.isOnError() && stopOnError) {
filteredTerminals.onError(t.getThrowable());
} else{ filteredTerminals.onNext(t); }}@Override
public void setProducer(Producer producer) { producer.request(Long.MAX_VALUE); }}; }}));// subscribe to the restarts observable to know when to schedule the next redo.
worker.schedule(new Action0() {
@Override
public void call(a) {
restarts.unsafeSubscribe(new Subscriber<Object>(child) {
@Override
public void onCompleted(a) {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Object t) {
if(! child.isUnsubscribed()) {// perform a best endeavours check on consumerCapacity
// with the intent of only resubscribing immediately
// if there is outstanding capacity
if (consumerCapacity.get() > 0) {
worker.schedule(subscribeToSource);
} else {
// set this to true so that on next request
// subscribeToSource will be scheduled
resumeBoundary.compareAndSet(false.true); }}}@Override
public void setProducer(Producer producer) { producer.request(Long.MAX_VALUE); }}); }}); child.setProducer(new Producer() {
@Override
public void request(final long n) {
if (n > 0) {
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
arbiter.request(n);
if (resumeBoundary.compareAndSet(true.false)) { worker.schedule(subscribeToSource); }}}}); }}Copy the code
Data duplication
Third: defer() method
An Observable is created only when a subscriber subscribes, and a new Observable is created for each subscription. Internally, call Func0 at subscribe time to create an Observable via OnSubscribeDefer. (Lazy loading)
private Observable<AppInfo> observable;
private AppInfoAdapter appInfoAdapter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
initView();
// registerObserver();
}
private void initView(a) {
ListView listView = (ListView) findViewById(R.id.lv_app_name);
appInfoAdapter = new AppInfoAdapter(this);
listView.setAdapter(appInfoAdapter);
}
/** * Create Observable **@return* /
private Observable<AppInfo> getApps(a) {
// Once an observer subscribes, the call method in Func0 is immediately called back to create an Observable(lazy loading)
return Observable.defer(new Func0<Observable<AppInfo>>() {
// @Override
// public Observable<AppInfo> call() {
// return Observable.create(new Observable.OnSubscribe<AppInfo>() {
//
// @Override
// public void call(Subscriber<? super AppInfo> t) {
// t.next (new AppInfo(" dream classmate ", 0)); // t.next (new AppInfo(" dream classmate ", 0));
// t.onCompleted();
// }
// });
// }
@Override
public Observable<AppInfo> call(a) {
Log.i("main"."Create Observable......");
AppInfo appInfo1 = new AppInfo("Michael classmate".0);
AppInfo appInfo2 = new AppInfo("Michael classmate".0);
AppInfo appInfo3 = new AppInfo("Michael classmate".0);
return Observable.just(appInfo1, appInfo2, appInfo3).repeat(3); }}); }public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted(a) {
// Refresh the UI when done
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(AppInfo t) {
// Add dataappInfoAdapter.addAppInfo(t); }}); }Copy the code
Click onClick to load 9 pieces of data
Fourth: the Ranger () method
private Observable<Integer> observable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
}
/** * Create Observable **@return* /
private Observable<Integer> getApps(a) {
// From the specified location, you need to send back how many
// The first argument: the starting value
// The second argument: quantity
// Note: the first argument is accumulated by quantity, and then sent
// The range() function takes two numbers: the first is the starting point and the second is the number of numbers we want to emit.
return Observable.range(5.5);
}
public void click(View v) {
observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted(a) {
// Refresh the UI when done
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(Integer t) {
// Add data
Log.i("main"."Data received:"+ t); }}); }Copy the code
Result output:
08-04 05:49:20. 818, 13051-13051 / com haocai. Architect. Rxjava I/main: receives the data: 08-04 05:49:20. 5, 819, 13051-13051 / com. Haocai. Architect. Rxjava I/main: receives the data: 6 08-04 05:49:20. 819, 13051-13051 / com haocai. Architect. Rxjava I/main: receives the data: 7 08-04 05:49:20. 819, 13051-13051 / com haocai. Architect. Rxjava I/main: receives the data: 8 08-04 05:49:20. 819, 13051-13051 / com. Haocai. Architect. Rxjava I/main: receives the data: 9Copy the code
Interval () : creates an Observable that emits integer sequences at fixed intervals. This is easier to understand. Interval () is also used to create observables, and can delay sending. But interval() executes periodically, so think of it this way: Interval () is a TimerTask that can specify threads.
Number six: the timer() method
private Observable<Long> observable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
}
/** * Create Observable **@return* /
private Observable<Long> getApps(a) {
// Timer method explanation
// The first argument: the interval between sending messages
// Second argument: time units (milliseconds, seconds, minutes, hours, and so on......)
return Observable.timer(3, TimeUnit.SECONDS);
}
public void click(View v) {
// The rotation notifies the observer
observable.subscribe(new Observer<Long>() {
@Override
public void onCompleted(a) {
// Refresh the UI when done
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(Long t) {
// Add data
Log.i("main"."Data received:"+ t); }}); }Copy the code
The output
08-04 08:25:14. 754, 24259-24357 / com haocai. Architect. Rxjava I/main: the received data: 0Copy the code
Conclusion:
Observable steps: Prepare an Observer, prepare an Observable, register an Observer(Observable.subscribe (new Observer())), unsubscribe(subscribe.unsubscribe()), and some logic in between