This section describes the Cache operator functions
Caches the arguments passed from the previous operation through onNext, and on the next subscribe, skip the previous step and execute the next step directly.
use
It may be difficult for you to understand the function of Cache only in text. You can imagine, for example, an HTTP request, where I have a delay, and if I didn’t have a cache, every time I subscribe I would actually initiate an HTTP request. But if you add a cache operator to the end, then the next time you subscribe, you actually get it from the cache, without actually making an HTTP request.
Let’s do a little code
var handler = Handler()
// Cache becomes more effective after a delay
// observable = Observable.create(object : Observable.OnSubscribe<String> {
// override fun call(t: Subscriber<in String>) {
// t.onNext("Test1")
// t.onNext("Test2")
// t.onNext("Test3")
// }
// }).delay(4, TimeUnit.SECONDS).cache()
observable = Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
t.onNext("Test2")
t.onNext("Test3")
}
}).cache()
btSub.setOnClickListener({
observable? .subscribe({ msg ->
handler.post(Runnable {
tvContent.text = tvContent.text.toString() + "\n" + msg
})
})
})
Copy the code
The cache is delayed by 4 seconds after the first click of the subscribe button triggered by Test1,Test2,Test3, second, third… Delay operator Test1,Test2,Test3, delay operator Test1,Test2,Test3, delay operator
With such a long string of introductions, I think you are rightcache
Operators have a certain understanding, so we take their own minds of doubt and guess, in accordance with the order of demo step by step into the source code.
###### Take a look at the source Observable
public final Observable<T> cache() {
return CachedObservable.from(this);
}
Copy the code
CachedObservable
public static <T> CachedObservable<T> from(Observable<? extends T> source) {
return (CachedObservable<T>)from(source, 16);
}
public static <T> CachedObservable<T> from(Observable<? extends T> source, int capacityHint) {
if (capacityHint < 1) {
throw new IllegalArgumentException("capacityHint > 0 required");
}
CacheState<T> state = new CacheState<T>(source, capacityHint);
CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);
return new CachedObservable<T>(onSubscribe, state);
}
Copy the code
Personal understanding: because actually has been introduced to the front, the cache is passed down in front of the parameter, keep up, so is certainly need to have an array or a list, here I see capacityHint, I think CacheState should be the container. It’s not the most important. In Observable subclasses, the two most important things to look at are OnSubscribe,Subscriber, and so on
CachedObservable.CacheState
static final class CacheState<T> extends LinkedArrayList implements Observer<T> {
.
}
Copy the code
CachedObservable.CachedSubscribe
static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
.
}
Copy the code
OnSubscribe is pretty obvious, CachedSubscribe. Is CacheState Subscriber? , where CacheState implements the Observer interface Observer
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
Copy the code
As we have introduced in several previous articles, Subscriber is actually onNext.. And other methods of concrete implementation. So we can view CacheState directly as Subscriber, which helps us a lot.
public abstract class Subscriber<T> implements Observer<T>, Subscription {
.
}
Copy the code
Subscriber implements the Observer interface. Subscriber can subscribe,unsubscribe, and onStart methods. In the RxJava operator, we are interested in the onNext method of Subscrber and the call method of OnSubscribe, so we have a deep understanding of this operator.
Now that we’ve found two important objects, let’s dig deeper. Observable. Cache gets a CachedObservable, and then clicks subscribe Observable
.
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
.
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
.
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
.
subscriber.onStart();
if (! (subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
.
Copy the code
I’m going to post this piece of code, which I’ve actually posted in the last couple of articles, but I’m going to post it again so you can remember it better. Call the subscribe method, is actually obsevable. OnSubscibe. The call (the subscriber) key local, the obsevable here, onSubscribe, what are the subscriber? The more operators you use in a chain, the harder it will be at the end, so we only have one operator here, Observable is CachedObservable and obsevable. OnSubscibe is CachedSubscribe. If you assume subscriber is the same as the CacheState, you are wrong. So what exactly is subscriber? So let’s start with the code and work our way up, and we’ll see. Subscriber is a parameter passed in from outside,
.
if (! (subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
.
Copy the code
In fact, we have been told that the final subscriber is actually a SafeSubscriber. The original subscriber is saved in SafeSubscriber as a construction parameter.
So let’s go ahead and see what subscriber was.
.
public final Subscription subscribe(final Action1<? super T> onNext) {
.
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
.
Copy the code
Obviously, the Action we passed in the demo was wrapped as ActionSubscriber.
So it ends up being this subscriber.
So let’s go further.
observable.onSubscriber.call(subscriber)
static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
.
@Override
public void call(Subscriber<? super T> t) {
ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
state.addProducer(rp);
t.add(rp);
t.setProducer(rp);
if (! get() && compareAndSet(false, true)) {
state.connect();
}
}
}
Copy the code
Let’s take it line by line.
- create
ReplayProducer
objectstatic final class ReplayProducer<T> extends AtomicLong implements Producer, Subscription {
.
public ReplayProducer(Subscriber<? super T> child, CacheState<T> state) {
this.child = child;
this.state = state;
}
@Override
public void request(long n) {
.
}
.
Copy the code
ReplayProducer is a Producer. We’ve seen before that Producer. We just need to look at the request method. So let’s take a look at its request.
- the
ReplayProducer
Added to theCacheState
In the objectpublic void addProducer(ReplayProducer<T> p) {
synchronized (connection) {
ReplayProducer<? >[] a = producers;
int n = a.length;
ReplayProducer<? >[] b = new ReplayProducer<? >[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = p;
producers = b;
}
}
Copy the code
We’re using an array to add a new element, which is pretty simple, but I won’t go into that.
- the
ReplyProducer
As aSubscription
andSubscriber
Bind together.public final void add(Subscription s) {
subscriptions.add(s);
}
.
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
.
Copy the code
It can be understood that when subscriber. Unsubscribe, the corresponding Producer cancels the subscription. This kind of operation also occurs in the previous operators.
- call
Producer
therequest
methods
The setProducer operation, as we’ve seen before, is a direct callProducer
The request method
Note: since we only use one operator now, it is relatively simple, so we will understand this first. In the future, we will talk about the mixed use of multiple operators, which may appear more complicated.
.
public void request(long n) {
for (;;) {
long r = get();
if (r < 0) {
return;
}
long u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
if (compareAndSet(r, u)) {
replay();
return;
}
}
}
public void replay() {
synchronized (this) {
if (emitting) {
missed = true;
return;
}
emitting = true;
}
boolean skipFinal = false;
try {
final Subscriber<? super T> child = this.child;
for (;;) {
long r = get();
if (r < 0L) {
skipFinal = true;
return;
}
int s = state.size();
if (s ! = 0) {
// This code is presented for analysis when read
.
}
synchronized (this) {
if (! missed) {
emitting = false;
skipFinal = true;
return;
}
missed = false;
}
}
} finally {
if (! skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
}
.
Copy the code
In the request method, the replay method is called. The most important sentence in it is
int s = state.size();
if (s ! = 0) {
.
}
Copy the code
The first call to subscribe didn’t cache anything at all, so state.size()==0, so the first call to replay basically did nothing.
- First call
subscribe
performif (! get() && compareAndSet(false, true)) {
state.connect();
}
Copy the code
Let’s see first! Get () && compareAndSet(false, true), because CachedSubscribe
extends AtomicBoolean
Get ()==false the first time, and then set to true, ensures that state.connect is not called the next time
Let’s look at the CONNECT method in detail. The most important caching operations are here
.
public void connect() {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onNext(T t) {
CacheState.this.onNext(t);
}
@Override
public void onError(Throwable e) {
CacheState.this.onError(e);
}
@Override
public void onCompleted() {
CacheState.this.onCompleted();
}
};
connection.set(subscriber);
source.unsafeSubscribe(subscriber);
isConnected = true;
}
.
Copy the code
Here, let’s do it step by step
- To create a
Subscriber
Because up front, we’re actually rightcache
The function has been known, that is, after the first time, to call againsubscribe
You just take the cached data and pass it on.
In this case, we can see that the newly created Subscriber is the one to be directly transferred. And we can actually draw a rough picture here.
Look at the picture speak
The first subscribe is realized by going above, and the second subscribe directly goes below the dotted line, and the Subscriber in the middle is the one we are talking about now. After the second time, I’m going to jump right over OnSubscribe in the demo.
Note: There is no reference to the Subscriber in the demo. According to the previous analysis, we know that the Subscriber in the demo has been wrapped as SafeSubscriber and stored in the child variable in ReplyProducer
- save
subscriber
Now that I mentioned, every operation after the first one will use thissubscriber
So we definitely have to put thissubscriber
Save it.public void set(Subscription s) {
.
state.update(s);
}
Copy the code
SequentialSubscription
.
public boolean update(Subscription next) {
for (;;) {
Subscription current = get();
if (current == Unsubscribed.INSTANCE) {
if (next ! = null) {
next.unsubscribe();
}
return false;
}
if (compareAndSet(current, next)) {
if (current ! = null) {
current.unsubscribe();
}
return true;
}
}
}
.
Copy the code
As we expected, the subscriber was indeed saved. Next, compareAndSet (current).
- For the first time,
subscribe
Again, call the previous oneonSubscribe
thesource.unsafeSubscribe(subscriber);
Copy the code
In fact, is
source.onSubscribe.call(subscriber);
Copy the code
Source is the Observable object passed in to our demo when we create CachedObservable. Create () OnSubscribe is just like onSubscribe in the demo. Now we still focus on subscriber.
In this case, the subscriber we can easily identify from the above code is the newly released one, and onNext is directly called onNext of CacheState.
CachedObservable.CacheState
.
public void onNext(T t) {
if (! sourceDone) {
Object o = NotificationLite.next(t);
add(o);
dispatch();
}
}
.
Copy the code
When we call a subscriber in the demo OnSubscribe. OnNext, is actually to enter CachedObservable. CacheState onNext.
Here we go again, sentence by sentence.
- NotificationLite.next(t)
public static <T> Object next(T t) {
if (t == null) {
return ON_NEXT_NULL_SENTINEL;
} else {
return t;
}
}
Copy the code
The argument is obviously null.
- add(o)
We mentioned earlier that CacheState is used to hold the parameters passed above. The implementation is right here, add(o).
- dispatch()
void dispatch() {
ReplayProducer<? >[] a = producers;
for (ReplayProducer<? > rp : a) {
rp.replay();
}
}
Copy the code
Obviously the most important thing here is rp.replay(). Let’s take a look at what rp.replay does regardless of why the for loop is used.
public void replay() {
.
int s = state.size();
if (s ! = 0) {
.
if (NotificationLite.accept(child, o)) {
.
}
}
.
}
Copy the code
NotificationLite.accept
public static <T> boolean accept(Observer<? super T> o, Object n) {
.
o.onNext((T) n);
return false;
.
}
Copy the code
Reply is to determine if there is a cache for CacheState and then call child.onNext(o) directly.
So what is a child? That’s what we analyzed
Calling onNext layer by layer eventually leads to our own Action.
Second callsubscribe
We’ve already gone through the whole process of calling SUBSCRIBE for the first time, so let’s just go through the process of calling SUBSCRIBE for the second time.
We start with CachedSubscribe’s call method.
public void call(Subscriber<? super T> t) {
ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
state.addProducer(rp);
t.add(rp);
t.setProducer(rp);
if (! get() && compareAndSet(false, true)) {
state.connect();
}
}
Copy the code
- I create another one
ReplayProducer
- Added to the
CacheState
In the - the
ReplayProducer
To join theSafeSubscriber
Are bound together - call
Producer
therequest
Method, in this case, is actually a callReplayProducer
thereplay
Methods (previously analyzed) - Because the first time, it was already set to
true
soget()==true
Just skip
Therefore, in summary, we still directly look at ReplayProducer’s replay method.
public void replay() {
.
int s = state.size();
if (s ! = 0) {
.
if (NotificationLite.accept(child, o)) {
.
}
}
.
}
Copy the code
NotificationLite.accept
public static <T> boolean accept(Observer<? super T> o, Object n) {
.
o.onNext((T) n);
return false;
.
}
Copy the code
Because the first time we saved all the emitted objects in CacheState. In our current demo, Test1,Test2, and Test33 strings are stored in state and SafeSubsciber’s onNext method is called directly.
conclusion
In general, the cache operator actually looks more difficult in the source code than the first two operators, but with the first two operators in the background, it is relatively easy.
additional
We already have a general understanding of the overall cache flow, but we still have a problem. Let’s look at this code again.
void dispatch() {
ReplayProducer<? >[] a = producers;
for (ReplayProducer<? > rp : a) {
rp.replay();
}
}
Copy the code
Why is there a for loop here?
In fact, we can see it from the picture above. Test1,Test2,Test3 are repeated three times. Because I clicked the SUBSCRIBE button 3 times in a row in the demo. Subscribe 3 times, adding 3 ReplayProducer to CacheState, so when we call our Dispatch method, the for loop calls replay 3 times, and then outputs 3 times.
Please give me a thumbs-up if you like it
WeChat pay
Alipay