Preface:

Now a lot of interviews will ask RxJava source code, directly speaking RxJava source code, we are not likely to look down, we first look at a quiz, and then go to see the relevant source code.

Body:

Problem a:

// Class Data {public String name; public Data(String name) { this.name = name; Observable<Data> Data = Observable. Just (new Data();"aaaa"),
      new Data( "bbbb")); Subscribe (new Consumer< data >() {@override public void accept(data data) throws Exception { // Change the name attribute of the Observable to CCCC data.name ="cccc"; }}); Subscribe (new Consumer< data >() {@override public void accept(data data) {// We subscribe to Observable data.subscribe(new Consumer< data >() {@override public void accept(data data); Throws Exception {// Print data.name log.v ("TAG",data.name); }});Copy the code

Question:

Log. V (“TAG”,data.name); Log. What is the output?


Problem two:

Observable<Integer> data1 = observable. just(1,2); data1.subscribe(new Consumer<Integer>() { @Override public void accept(Integer d) throws Exception{ d++; }}); data1.subscribe(new Consumer<Integer>() { @Override public void accept(Integer d) throws Exception{ Log.v("TAG"."d:"+d); }});Copy the code

Question:

V (“TAG”,”d:”+d); What is the output?


The answer is: the first output is CCCC, CCCC; The second one is 1,2. I don’t know if you got it right.

If you don’t get it right, let’s analyze the code together.


Problem analysis:

Let’s look at the code for the first case:

Observable.just(
      new Data("aaaa"),
      new Data( "bbbb"));Copy the code

Just the source code:

public static <T> Observable<T> just(T item1, T item2) {
     ObjectHelper.requireNonNull(item1, "The first item is null");
     ObjectHelper.requireNonNull(item2, "The second item is null");

     return fromArray(item1, item2);
}
Copy the code

The first two lines check for null. The main thing is that line 3 returns an Observable via fromeArray.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
     ObjectHelper.requireNonNull(items, "items is null");
     if (items.length == 0) {
         return empty();
     } else
     if (items.length == 1) {
         return just(items[0]);
     }
     return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
Copy the code

We can see that different Observables are returned based on the number of messages sent by the user. For example, empty() is returned when there are zero; , returns just(items[0]); , all others return rxJavaplugins. onAssembly(New ObservableFromArray

(items)); But the essence is the same. Why do you say this:

The empty () the source code:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Observable<T> empty() {
    return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
}
Copy the code

We can see that the rxJavaplugins.onAssembly method is also called.

Just the items ([0]) source code:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
     ObjectHelper.requireNonNull(item, "The item is null");
     return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
Copy the code

The rxJavaplugins.onAssembly method is also called.

So all three observables are different observables, but all call rxJavaplugins.onAssembly and pass in different object parameters.

PS : For any observable. create or observable. interval method that we see, it ends up calling rxJavaplugins.onAssembly, OnAssembly (New ObservableCreate

(source)); OnAssembly (New ObservableInterval(XXX, XXXX, XXXX))).

OnAssembly (New ObservableFromArray

(items)); rxJavaplugins. onAssembly(New ObservableFromArray

(items)); Methods:

We can see the rxJavaplugins.onAssembly method:

@SuppressWarnings({ "rawtypes"."unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if(f ! = null) {return apply(f, source);
   }
   return source;
}
Copy the code

We can see that the incoming Observable Source is returned, so we passed in new ObservableFromArray

(items).

So the Observable we end up with is the new ObservableFromArray

(items), so we usually follow that up with

ObservableFromArray<T>(items) Observable Observable = Observable. Just (new Data() ObservableFromArray<T>(items) Observable Observable = Observable."aaaa"),
      new Data( "bbbb")); Subscribe(new Observer<Data>() {@override public void onSubscribe(Disposable d) {} @Override public void onNext(Data data) { } @Override public void onError(Throwable e) { } @Override public voidonComplete() {}});Copy the code

Let’s start with the subscribe method:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // Call the subscribeActual(observer) method of Observable. } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }}Copy the code

We can see the inside of the code to add note, finally we observables. Subscribe (observer) finally carried out into: observables. SubscribeActual (observer); Since we said that our Observable is an instance of ObservableFromArray, we’ll go straight to the source code.

ObservableFromArray. Class:

public final class ObservableFromArray<T> extends Observable<T> { final T[] array; Public ObservableFromArray(T[] array) {this.array = array; Override public void subscribeActual(Observer<? Super T> s) {//new Disposable we use to unsubscribe, This is FromArrayDisposable FromArrayDisposable<T> d = New FromArrayDisposable<T>(s, array); // That is the onSubscribe method we Observer override and pass s.subscribe (d) to the Disposable;if (d.fusionMode) {
            return; } // Then execute the run method d.run() of the FromArrayDisposable; } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> actual; final T[] array; int index; boolean fusionMode; volatile boolean disposed; // The constructor passes in an Observer and the array FromArrayDisposable(Observer<? super T> actual, T[] array) { this.actual = actual; this.array = array; } @Override public int requestFusion(int mode) {if((mode & SYNC) ! = 0) { fusionMode =true;
                return SYNC;
            }
            return NONE;
        }

        @Nullable
        @Override
        public T poll() {
            int i = index;
            T[] a = array;
            if(i ! = a.length) { index = i + 1;return ObjectHelper.requireNonNull(a[i], "The array element is null");
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == array.length;
        }

        @Override
        public void clear() { index = array.length; } // The method we used to unsubscribe @override public voiddispose() {
            disposed = true; } @override public Boolean specifies whether to cancel the subscriptionisDisposed() {
            returndisposed; } // When a subscription is executed, the run method of Disposable is actually executed. voidrun() { T[] a = array; int n = a.length; /* Go through the array that we're passing disposed, and determine isDisposed() so we know why we unsubscribe by executing the Disposable dispose() method because that's when they should be disposedtrueAnd then hereforThe loop judgment will exit the loop. * /for(int i = 0; i < n && ! isDisposed(); i++) { T value = a[i]; /* We know that in RxJava 1 it is ok to send a null value, but in RxJava2 it is not ok, because a null operation is done. The Observer onError method */ is executedif (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return; } // Execute the Observer onNext method and pass the values to actual. OnNext (value); } /* If the user finishes onNext and does not execute dispose() methodifThe inside oftrueThe Observer onComplete() method is executed. * /if(! isDisposed()) { actual.onComplete(); }}}}Copy the code

As can be seen from the code, an Observable sends the same array and the same Data after generating an instance, so the Data object it receives is the same when subscribing to multiple observers. However, because the first Observer has modified the properties of the object, the second Observer acquires the same object with different values.

It’s as simple as this:

Data data = new Data("aaaa");
Log.v("TAG",data.name);
change(data);
Log.v("TAG",data.name)

public void change(Data data){
     data.name = "bbbb";
}
Copy the code

Well, that’s one way to think about it. Both print the same object, but the properties have been changed.

So does our code for case one make sense already, folks.

The second scenario is not really a test of the source code base of RxJava, but a test of the Java foundation. Because in case two we send (1,2); Is equivalent to:

int data = 1;
Log.v("TAG"."data:"+data);
change(data);
Log.v("TAG"."data:"+data);

public void change(int data){
     data = 2;
}
Copy the code

You’ll notice that both logs actually print the same thing, 1.

Unlike other languages, Java does not allow programmers to choose whether to pass individual arguments by value or by reference. Variables of primitive types (byte–short–int–long–float–double– Boolean –char) are always passed by value. In the case of an object, instead of passing the object itself to the method, you pass a reference to the object or the first address of the object to the method. The reference itself is passed by value ———– that is, a copy of the reference is passed to the method (a copy means that the object has two references at this point). By referring to the object, Method can manipulate the object directly (the object can only be changed when the object is manipulated, whereas the source object is not changed when the reference is manipulated).

See this article on Java Value Passing, Reference Passing, and array passing

Conclusion:

Therefore, in this chapter, we read more about the source code of Rxjava Observable generation, Observer subscription, and Java value transfer and other related knowledge.