This section describes the functions of the Amb operator
The parameter is passed to multiple Observables, and whichever one executes OnSubscribe faster executes, while the others are cancelled
use
This operator, which I don’t use very often, but you can imagine what it does. There are multiple asynchronous operations, and you don’t know which one is going to finish first. You just want to execute a method once after a certain operation has finished, and then this operator will work just fine.
Let’s do a little code
.
observable = Observable.amb(Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
}
}), Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test2")
}
}))
.
observable? .subscribe({ msg ->
tvContent.text = tvContent.text.toString() + "\n" + msg
})
Copy the code
Here we create two Observables that pass in the AMB operator. Then subscribe to the message. Demo is very simple, but more introduction, the following directly look at the source analysis.
Look at the source code
Observable
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
return unsafeCreate(OnSubscribeAmb.amb(o1, o2));
}
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
Copy the code
If you read the previous article, you’ll see that observable. create and Observable.unsafeCreate are the same. Observable
.
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
.
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
.
Copy the code
From these lines of code, we can see that the most important line is actually this one
OnSubscribeAmb.amb(o1, o2)
Copy the code
Without looking at the source code, we can see that this statement creates an OnSubscribe object. As described in the previous article, In fact, we can know that the most important thing is to look at the two things: OnSubscribe call, Subscriber object onNext,onComplete… Methods such as
Let’s dive further into OnSubscribeAmb
.
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
return amb(sources);
}
.
public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? extends T>> sources) {
return new OnSubscribeAmb<T>(sources);
}
.
public void call(final Subscriber<? super T> subscriber) {
final Selection<T> selection = new Selection<T>();
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
AmbSubscriber<T> c;
if ((c = selection.get()) ! = null) {
c.unsubscribe();
}
unsubscribeAmbSubscribers(selection.ambSubscribers);
}
}));
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
AmbSubscriber<T> c;
if ((c = selection.get()) ! = null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
// while subscribing unsubscription may have occurred so we clean up after
if (subscriber.isUnsubscribed()) {
unsubscribeAmbSubscribers(selection.ambSubscribers);
}
.
}
Copy the code
This code is a little bit long, but the main thing is of course the call method, where the amB overload method, the main thing is new is new and there’s nothing to say about an OnSubscribe.
Ultimately, call is executed, so let’s focus on the call method.
Again, I’m going to suggest a personal approach to reading the source code. If you already know what the method or class does, try to imagine how you might implement it.
For example, if I were you, I would go through my List< Observable > and subscribe one by one. After the first OnSubscribe call, I would cancel all the other Observables.
So let’s dig a little deeper, and there’s a Selection in the call method, and let’s see what it is
OnSubscribeAmb
static final class Selection<T> extends AtomicReference<AmbSubscriber<T>> {
final Collection<AmbSubscriber<T>> ambSubscribers = new ConcurrentLinkedQueue<AmbSubscriber<T>>();
public void unsubscribeLosers() {
AmbSubscriber<T> winner = get();
if (winner ! = null) {
unsubscribeOthers(winner);
}
}
public void unsubscribeOthers(AmbSubscriber<T> notThis) {
for (AmbSubscriber<T> other : ambSubscribers) {
if (other ! = notThis) {
other.unsubscribe();
}
}
ambSubscribers.clear();
}
}
Copy the code
The Amb operator may be used in many asynchronous operations, so of course there will be multithreading problems, so we use Atomic** here, I won’t go into the details, if you are interested in baidu.
We can look at the main role of Selection
- There’s a variable
ambSubscribers
, for storageSubscriber
Object, where the concrete implementation isAmbSubscriber
. - Unsubscribe from others
Subscriber
Finished watchingSelection
So let’s go backcall
The code in the.
// The first part
final Selection<T> selection = new Selection<T>();
// Part 2
subscriber.add(Subscriptions.create...) );
// Part 3
for (Observable<? extends T> source : sources) {}
// Part 4
if (subscriber.isUnsubscribed()) {
unsubscribeAmbSubscribers(selection.ambSubscribers);
}
// Part 5
subscriber.setProducer(...)
Copy the code
Now that we’ve labeled the code that way, we’re really just going to go through what it means one by one, and the first part is just creating a Selection, and I won’t go into that, but let’s move on to the second part.
The second part
The second part is also very simple, adding a Subscription to Subscriber
Subscriber
.
public final void add(Subscription s) {
subscriptions.add(s);
}
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
.
Copy the code
All List
subscriptions are cancelled when the Subscriber is unregistered.
The third part
This part, in fact, is consistent with the conjecture I made earlier. So we have time when might as well try, is also very exercise. Let’s look at the code in part 3 in detail.
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
AmbSubscriber<T> c;
if ((c = selection.get()) ! = null) {
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
Copy the code
There are a lot of comments in the source code itself, and I’ve removed them to save space, but in this code, the most important sentence is
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
Copy the code
I’ve been emphasizing that the most important thing to read source code is to look at the OnSubscribe and Subsciber subclasses. Source code often gives you a layer over the original OnSubscribe and Subsciber that you can’t fully understand unless you read it carefully.
So here, AmbSubscriber is used to package a layer to the original Subsciber, so when we subscribe to the message, we call onNext and other methods in AmbSubscriber first, and then call our own Subscriber.
The next sentence is
if ((c = selection.get()) ! = null) {
selection.unsubscribeOthers(c);
return;
}
Copy the code
Selectable. Get () returns an AmbSubscriber object. If the object is not empty, it means that an OnSubscibe execution has been completed. We can come back to that later. You’ll understand it better.
The last sentence
source.unsafeSubscribe(ambSubscriber);
Copy the code
The source is the Observable we pass in when we call the AMB operator. The call uses the unsafeSubscribe method.
Observable
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
.
Copy the code
This method is similar to subscribe mentioned in the previous article, the difference is that there is no additional parameter Subscriber wrapped with SafeSubscriber.
So it’s going to be the same thing
observable. onSubscribe.call(subscriber)
Copy the code
Where the subscriber is AmbSubscriber, onSubscribe is our incoming onSubscribe object in the demo. Now let’s look at the demo code
.
observable = Observable.amb(Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
}
}), Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test2")
}
}))
.
Copy the code
The call method in the OnSubscribe object passed in the demo is written very simply, calling the onNext method in Subscriber directly. That’s the onNext method of AmbSubscriber
AmbSubscriber
.
public void onNext(T t) {
if (isSelected()) {
subscriber.onNext(t);
}
}
.
private boolean isSelected() {
if (chosen) {
return true;
}
if (selection.get() == this) {
// fast-path
chosen = true;
return true;
} else {
if (selection.compareAndSet(null, this)) {
selection.unsubscribeOthers(this);
chosen = true;
return true;
} else {
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
selection.unsubscribeLosers();
return false;
}
}
}
}
.
Copy the code
The AmbSubscriber code is very simple, and I will make a simple translation here. First check whether it is selected. If it is selected, the onNext of Subscriber will be directly executed. The Subscriber here is also the onNext written in our demo (actually, it is the ActionSubscriber wrapped by the source code at one layer, which is not important).
The most important code is this one
if (selection.compareAndSet(null, this)) {
selection.unsubscribeOthers(this);
chosen = true;
return true;
}
Copy the code
Selection.com pareAndSet(null, this), selection.compareAndSet(null, this), selection.compareAndSet(null, this) A common method for this object is compareAndSet(null, this), which means this is set to its value if selection is null, and returns true if null, false otherwise.
Therefore, the meaning of this code is quite easy to understand. When the first AmbSubscriber calls onNext, the value of selection will be set to this, and the next AmbSubscriber will be late to execute. After the first AmbSubscriber sets selection to this, the other ambsubscribers will be unsubscribed.
The fourth part
It can be seen directly that, if the subscription is cancelled, the Subscriber in List< AmbSubscriber > can be directly unsubscribed and then emptied. This step is mainly to prevent the user from unsubscribing.
The fifth part
In fact, the third part is the most important. Without parts 4 and 5, we have a better understanding of the Amb operator. But part 5 is important for understanding the RxJava source code itself.
OnSubscribeAmb
subscriber.setProducer(new Producer() {
@Override
public void request(long n) {
.
}
})
Copy the code
Subscriber
public void setProducer(Producer p) {
.
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
Copy the code
In other words, setProducer directly calls the request method. Of course, it is not that simple in itself. But if we understand it for the first time, we should follow the simplest way to understand it.
So let’s go back and see
OnSubscribeAmb
subscriber.setProducer(new Producer() {
@Override
public void request(long n) {
AmbSubscriber<T> c;
if ((c = selection.get()) ! = null) {
// propagate the request to that single Subscriber that won
c.requestMore(n);
} else {
//propagate the request to all the amb subscribers
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
if (! ambSubscriber.isUnsubscribed()) {
// make a best endeavours check to not waste requests
// if first emission has already occurred
if (selection.get() == ambSubscriber) {
ambSubscriber.requestMore(n);
// don't need to request from other subscribers because choice has been made
// and request has gone to choice
return;
} else {
ambSubscriber.requestMore(n);
}
}
}
}
}
});
Copy the code
AmbSubscriber
.
private void requestMore(long n) {
request(n);
}
.
Copy the code
It can be seen that the requestMore of AmbSubscriber is the request method to call the parent Subscriber.
OnSubscribe
.
protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
// if producer is set then we will request from it
// otherwise we increase the requested count by n
Producer producerToRequestFrom;
synchronized (this) {
if (producer ! = null) {
producerToRequestFrom = producer;
} else {
addToRequested(n);
return;
}
}
// after releasing lock (we should not make requests holding a lock)
producerToRequestFrom.request(n);
}
.
Copy the code
The Subscriber request method actually mainly does one thing,
- addToRequested
private void addToRequested(long n) {
if (requested == NOT_SET) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
}
}
Copy the code
It just keeps track of the total number of requests
- producerToRequestFrom.request(n);
Because our demo is very simple, it’s just a single operator, so if we were to follow the code, we probably wouldn’t be running here. ProducerToRequestFrom is also a Producer. This we will not go into depth, first leave a general impression, RxJava source code has a lot of such nesting, OnSubscribe and then package a layer of OnSubscribe,Subscriber and then package a layer of Subscriber.
conclusion
In fact, overall, the Amb operator has been through the source code, but it is likely that we still have some problems. For myself, I noticed a problem when I started watching it. That’s part five and what happens when you finish executing the code?
The Amb operator has been introduced before, which is probably operated in asynchronous mode, that is to say, when the call method has been executed, the onNext method of AmbSubscriber has not been executed. As can be seen from the previous code, Only when the onNext method of the first AmbSubscriber is executed will the other AmbSubscriber be unsubscribed. Therefore, if the operation is asynchronous, multiple AmbSubscriber are always in memory waiting to be run.
So it’s easy to see why it’s best to call the unsubscribe method manually, because it’s easy to leak memory in asynchronous mode.
Since this article has a lot of references from the previous article, if you have not read the previous article, please read the previous article first so that you can understand it better
I’m the simplest operator — Create
WeChat pay
Alipay