preface

Recently looking at some Rx source code, Rx has been the source code to people feel are more around, based on this problem I want to start from a relatively simple Rx code, try to summarize its source package routine. This article will summarize what Observables. Create does by looking at the RxJava and RxSwift source codes to summarize some of the routines for looking at the Rx source code. Since both versions of Rx are involved, some parts of this article may be slightly similar, taking into account that not everyone knows both languages at the same time. You can read the versions in the languages you are familiar with under the subheadings (ps: This article’s Rx source code is based on RxJava 2.1.16, RxSwift 5.1.1). When it comes to Rx, you can associate with responsive programming ideas, here reprint a personal feel analysis of traditional object-oriented programming ideas and responsive programming ideas better article: [Responsive programming thinking art] responsive Vs object-oriented, this article is based on THE INTRODUCTION of RxJs, here to understand its ideas.

Rx basic concepts

Rx code invocation can be summarized into three stages: event generation, event processing, and event consumption.

Create (), just(), from(), etc. - Events generate map(), filter(), etc. - events process SUBSCRIBE () - events consumeCopy the code
  • Event generation: developer calls onNext, onError… And so on.
  • Event processing: Rx rich operators, the generated data flow for the second creation, such as map conversion, filter filtering, etc.
  • Event consumption: subscription for onNext/onError/onCompleted/onDisposed custom response processing.

The following is the observables. Create code based on RxJava and RxSwift, where events are generated, processed, and consumed: ps: The RxJava sample code is written by Kotlin.

Observable.create<String> {  // it::class.java == ObservableEmitter
    Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
    it.onNext("I'm ")
    it.onComplete()
}.map {
    Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
    return@map it + "rx"
}.subscribe { // onNext
    Log.d(TAG, "Event consuming thread:${Thread.currentThread().name}")
    Log.d(TAG, it)
}
Copy the code
Observable<String>.create { (ob) -> Disposable in
            print("Event generation thread:\(Thread.current.description)")
            ob.onNext("I'm ")
            ob.onCompleted()
            return Disposables.create()
        }
        .map({ (str) -> String in
            print("Event processing thread:\(Thread.current.description)")
            return str + "rx"
        })
        .subscribe(onNext: { (str) in
            print("Event consuming thread:\(Thread.current.description)")
            print(str)
        }, onError: nil, onCompleted: nil, onDisposed: nil)
Copy the code

Ps: As an aside, you can see from the example above that the output shows that the execution thread of the function is the main thread. One might think that Rx is multithreaded by default, but this example tells us that Rx is executed by default on the thread that you subscribe on, like the main thread here. Only subscribeOn or observeOn has the thread scheduling capability. The resolution of these two operators and the issue of thread scheduling will be covered in a future article.

Let’s take a look at how event generation -> consumption is implemented in Observables. Create, ignoring the event processing phase to make the code easier to understand. Tips: because Rx source code involves more custom types, the text description may not be clear enough to cause logical confusion, can be compared with the source reading.

RxJava

Observable.create<String>

The following code parsing takes place based on the following code:

Observable.create<String> {  // it::class.java = ObservableEmitter
    Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
    it.onNext("Hello world")
    it.onComplete()
}.subscribe { // onNext
    Log.d(TAG, "Event consuming thread:${Thread.currentThread().name}")
    Log.d(TAG, it)
}
Copy the code

Before parsing the code, you can get an idea of the keywords ObservableCreate, subscribeActual, and SUBSCRIBE, which will be analyzed in detail later.

Phase 1: Create/define

This section focuses on the code that calls Observable. Create.

// Observable.java
public static Observable create(ObservableOnSubscribe source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}
Copy the code
// ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe source;

    public ObservableCreate(ObservableOnSubscribe source) {
        this.source = source; }... }Copy the code
  • The observable. create method creates an ObservableCreate object, a subscriptable object in Rxjava. All subscriptable objects in Rxjava are associated with the Observable class, which is an abstract class. RxJavaRlugins is a helper class in Rxjava that defines hooks. See RxJavaRlugins. Java.
  • The ObservableCreate class is created by adding an ObservableOnSubscribe object originally passed in by calling Observable.create (ps: ObservableOnSubscribe is an interface that implements anonymous objects in the process of event generation by the developer. For its own member variable, so that subsequent developer calls trigger subscribe.
  • It is worth noting that the ObservableOnSubscribe object passed in the observable. create method is an interface implementation class. The event occurs in the subscribe method we define:
// ObservableOnSubscribe.java
public interface ObservableOnSubscribe<T> {
    void subscribe( ObservableEmitter emitter) throws Exception;
}
Copy the code

The above parsing content is visible source:

  • Observables. Create: observables. Java 1557 rows
  • ObservableCreate.java
  • Rxjavaplugins. Java, RxJavaPlugins introduction
  • ObservableOnSubscribe.java

Phase 2: Subscription

When we define the event generation part by observable. create, we invoke the SUBSCRIBE method to trigger the subscription logic. The following section will take you through the source code for this section.

1, when calling ObservableCreate. After the subscribe, will be to our incoming onNext, onError callback for an object encapsulation, there will be a LambdaObserver. In fact, the onNext callback we pass in is the Consumer interface implementation (ps: the onComplete callback is the Action interface implementation). The LambdaObserver object is then used to call its own subscribe overloading method (Observable.java).

// Observable.java
public final Disposable subscribe(Consumersuper T> onNext, Consumersuper Throwable> onError, Action onComplete, Consumersuper Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}
Copy the code

The above code occurs in line Observable.java 12013, the parent class of ObservableCreate

2. The overloaded SUBSCRIBE method is still in the superclass Observable, and its ultimate purpose is to call the LambdaObserver object in 1 to the subscribeActual method. By the way, the subscribeActual method is abstract. Actually now subclass ObservableCreate.

// Observable.java
public final void subscribe(Observersuper T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);

        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        thrownpe; }}...protected abstract void subscribeActual(Observersuper T> observer);
Copy the code

The above code

  • The subscribe method is at line 12029 of Observable.java
  • The subscribeActual method is on line observable.java 12059

3, ObservableCreate. SubscribeActual method mainly did two things, one is the instance a wrapper class CreateEmitter, used for packing our incoming LambdaObserver object, Second, we defined the ObservableOnSubscribe object (the Source object of the ObservableCreate class) to call its subscribe method, which triggers the event generation code we defined.

// ObservableCreate.java
protected void subscribeActual(Observersuper T> observer) {
    CreateEmitter parent = new CreateEmitter(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
Observable.create {  // it::class.java = ObservableEmitter
    Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
    it.onNext("Hello world")
    it.onComplete()
}
Copy the code

Back to the beginning of our definition, here it is comments indicate it is a ObservableEmitter object, namely the above mentioned in ObservableCreate. SubscribeActual by instance wrapper class CreateEmitter parent class, So here when we call something like IT.onNext, we will fire the CreateEmitter onNext method. Here is the CreateEmitter. OnNext method:

// ObservableCreate.java
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {...final Observersuper T> observer;

    CreateEmitter(Observersuper T> observer) {
        this.observer = observer;
    }

    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if(! isDisposed()) { observer.onNext(t); }}...Copy the code

. Combining the above ObservableCreate subscribeActual method, the observer on the CreateEmitter class object, it is our first LambdaObserver object, call here the observer. OnNext (t), That actually triggers the onNext callback that we pass in.

Above source:

  • Java ObservableCreate. SubscribeActual in ObservableCreate. 35 lines
  • The CreateEmitter class starts on line 47 of Observablecreat.java

At this point, the stream of Observable. Create data has closed the loop.

conclusion

Based on the above code parsing, the author summarizes the data flow routines for RxJava:

  • Developers use the API to assemble Rx code, which is essentially a nesting of multiple integrated objects from Observable. The last thing to be nested is our own implementation of the object (source) for event generation (ps: the objects used for event generation are not uniform here, such as create, which is an ObservableOnSubscribe object, and just, which is a stereotype).
  • When the subscribe is called externally, the subscribeActual method, which is an abstract method used to process subscriptions, is triggered for each custom Observable subclass. The end result is that the subscribeActual method is called progressively until we reach the level where the event we define is generated.
  • In the subscribeActual method, a decorator object that wraps the source, the nested object at the upper level, is typically generated for event consumption (onNext…) The trigger.
  • After subscribeActual is called, the SUBSCRIBE method of the source is called when appropriate to trigger the event generation phase.
  • Points 2-3 describe the flow of assembled object data after the call to subscribe. This process can be understood as the nesting of operators from the bottom up of the subscribe.
  • After the event is generated (for example, after onNext is called), the flow of data will be passed from the level at which the event was generated down through methods such as onNext.

We can use the rxjava-observable. Create example to get a flow chart of data after calling subscribe: ps: In addition, it is not always the case that the event is generated immediately after the subscribe is triggered, as in the previous example. In most cases, the event is generated separately from the subscription.

RxSwift

Observable<String>.create

The following code parsing takes place based on the following code:

Observable<String>.create { (ob) -> Disposable in
            print("Event generation thread:\(Thread.current.description)")
            ob.onNext("Hello world")
            ob.onCompleted()
            return Disposables.create()
        }
        .subscribe(onNext: { (str) in
            print("Event consuming thread:\(Thread.current.description)")
            print(str)
        }, onError: nil, onCompleted: nil, onDisposed: nil)
Copy the code

Before parsing, it needs to be added that calling onNext, onError and other event streams in RxSwift will eventually go to func on(_ Event: Event) method, so the source code is mostly through the implementation of on method to achieve the event flow passing callback. The on method is defined in observerType.swift

// ObserverType.swift
public protocol ObserverType {
    associatedtype Element

    (*, deprecated, renamed: "Element")
    typealias E = Element

    func on(_ event: Event)
}

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }

    public func onCompleted(a) {
        self.on(.completed)
    }

      public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
Copy the code

We know that Event is an enumeration type that defines next, Completed, and Error, corresponding to onNext, onCompleted, and onError. By the way, ObserverType is the protocol for all observers.

Phase 1: Create/define

// Create.swift
public static func create(_ subscribe: @escaping (AnyObserver) - >Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
Copy the code
// Create.swift
final private class AnonymousObservable<Element> :Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element- > >)Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
.
}
Copy the code
  • The Observable. Create method creates an AnonymousObservable object, a subscriptable object in RxSwift. The create method here lets us pass in a closure. This closure is used for event generation.
  • The member variable in the AnonymousObservable class is the closure passed in on create, and this closure is alias SubscribeHandler for subsequent developer calls that trigger subscribe.

Above source:

  • The create method is in line 20 of create.swift
  • The AnonymousObservable class starts on line creation.swift 64

Phase 2: Subscription

1, when calling AnonymousObservable. After the subscribe, will be to our incoming onNext, onError callback for an object encapsulation, there will be a AnonymousObserver. Finally, AnonymousObservable’s own subscribe overload method is called. The following code defines an extension method to the AnonymousObservable protocol ObservableType.

// ObservableType+Extensions.swift
public func subscribe(onNext: ((Element) - >Void)? = nil.onError: ((Swift.Error) - >Void)? = nil.onCompleted: (() - >Void)? = nil.onDisposed: (() - >Void)? = nil) -> Disposable {
            .
            let observer = AnonymousObserver<Element> { event in

                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif

                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
Copy the code

AnonymousObserver has a closure called EventHandler, whose implementation is visible in the above subscribe method, and the closure is called to the onCore method of that class, The onCore method is called when the on method of the ObserverType protocol mentioned above is called when the event is generated. (ps: here the inheritance hierarchy is more, it is recommended to check the source code.)

// AnonymousObserver.swift
final class AnonymousObserver<Element> :ObserverBase<Element> {
    typealias EventHandler = (Event<Element- > >)Void

    private let _eventHandler : EventHandler

    init(_ eventHandler: @escaping EventHandler) {
.
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event) {
        return self._eventHandler(event)
    }
.
}
Copy the code

Above source:

  • The subscribe method is in ObservableType+Extensions.swift line 39
  • AnonymousObserver.swift

}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}} So we can ignore that. As you can see from the code, the code eventually calls its own run method, which in this case is implemented by AnonymousObservable.

// Producer.swift
class Producer<Element> : Observable<Element> {
    .
    override func subscribe(_ observer: Observer) -> Disposable where Observer.Element = = Element {
        if !CurrentThreadScheduler.isScheduleRequired {
           .
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer(a)let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run(_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Element {
        rxAbstractMethod()
    }
}
Copy the code

Above source:

  • Producer.swift

3, anonymousobServable. run method:

One is to instantiate a wrapper class, AnonymousObservableSink, that wraps the AnonymousObserver object we pass in.

Second is itself (AnonymousObservable) is a parameter called AnonymousObservableSink. The run method. AnonymousObservableSink. The realization of the run method is invoked AnonymousObservable _subscribeHandler closure objects, namely the trigger events that we define generate code.

// Create.swift
final private class AnonymousObservable<Element> :Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element- > >)Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run(_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)

        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
Copy the code
// Create.swift
final private class AnonymousObservableSink<Observer: ObserverType> :Sink<Observer>, ObserverType {
    typealias Element = Observer.Element
    typealias Parent = AnonymousObservable<Element>
.

    func run(_ parent: Parent) -> Disposable {
     return parent._subscribeHandler(AnyObserver(self))}.
Copy the code
Observable<String>.create { (ob) -> Disposable in
            print("Event generation thread:\(Thread.current.description)")
            ob.onNext("I'm ")
            ob.onCompleted()
            return Disposables.create()
        }
Copy the code

Back to the beginning of our definition, the type of the ob here for AnonymousObservableSink. Run the AnyObserver objects, and can be ignored as here is self AnonymousObservableSink object, . So when we call here such as ob onNext, will trigger a AnonymousObservableSink onNext method, actually will eventually go to AnonymousObservableSink. The methods on:

// Create.swift
final private class AnonymousObservableSink<Observer: ObserverType> :Sink<Observer>, ObserverType {
    .
    override init(observer: Observer.cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) = = 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) = = 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
.
Copy the code

Its forwardOn method is in its parent Sink:

// Sink.swift
final func forwardOn(_ event: Event) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
Copy the code

Finally, we can see that the _observer object is actually the AnonymousObserver object, and calling its on method corresponds to the callback firing of the onNext closure.

Above source:

  • AnonymousObservable starts on line creation.swift 64
  • AnonymousObservableSink starts at line 25 of create.swift
  • AnonymousObservableSink. ForwardOn method in the Sink. Swift 26 line up

At this point, the stream of Observable. Create data has closed the loop.

conclusion

  • Developers use the API to assemble Rx code, essentially nested objects that inherit from the ObservableType protocol. The last thing to be nested is our own implementation of the objects used for event generation (ps: the objects used for event generation are not uniform here, such as create, which is a closure, and just, which is a paradigm).
  • When the subscribe is invoked externally, it triggers the SUBSCRIBE of each custom ObservableType protocol subclass’s abstract method for processing subscriptions, with the end effect of progressively calling the subscribe method until we reach the level where the event we defined is generated.
  • In the SUBSCRIBE method, we typically generate a decorator object that wraps the source (the nested object above) for event consumption (onNext…). The trigger. After the subscribe call is made, the subscribe method of the source is called at the appropriate time to trigger the event generation phase.
  • Points 2-3 describe the flow of assembled object data after the call to subscribe. This process can be understood as the nesting of operators from the bottom up of the subscribe.
  • After the event is generated (for example, after onNext is called), the flow of data will be passed from the level at which the event was generated down through methods such as onNext.

We can use the example rxswift-Observable. Create to get a flow chart of the data after the subscribe call: ps: In addition, it is not always the case that the event is generated immediately after the subscribe is triggered, as in the previous example. In most cases, the event is generated separately from the subscription.

The last

So far, the analysis of the create data stream of RxJava and RxSwift is finished, because the code of Rx involves a lot of custom classes, which is very easy to be confused. The author also tries to explain it by highlighting the key points and code segments. The focus of this article is to summarize some packages of Rx code through the data flow of create method. If you feel confused after reading, I suggest you comb the process through code combined with this article. The next article will focus on the subscribeOn and observeOn implementations of the two versions combined with the ideas outlined above.

Series of articles:

  • RxJava subscribeOn and observeOn source parsing
  • RxSwift subscribeOn and observeOn source parsing
  • Let’s take a look at RxJava Scheduler!
  • How is RxSwift Scheduler implemented