• Confusion between Subject and Observable + Observer [Android RxJava2] (What the hell is this) Part8
  • Hafiz Waleed Hussain
  • The Nuggets translation Project
  • Permanent link to this article: github.com/xitu/gold-m…
  • Translator: RockZhai
  • Proofread by hanliuxin5

[Android RxJava2] [Android RxJava2] [Android RxJava2] [Android RxJava2] [Android RxJava2

Wow, we have an extra day, so let’s learn something new to make it a great day 🙂.

Hello, everyone. I hope you’ve done well by now. This is the eighth sentence [first, second, third, fourth, fifth, sixth, seventh, eighth] in our series on RxJava2 Android. Subjects in Rx will be discussed in this article.

Research motivation: This research motivation is the same as the one shared in the first part of this series.

Introduction: When I started this journey with Rx, Subjects were the part I was most confused about. Most of the time WHEN I start reading any blog, I get a definition like this: “Subjects are like an Observable and an Observer.” Because I’m not a smart person, this has always puzzled me, so after doing a lot of practice with Rx, one day I got the concept of Subjects, and I was surprised at how powerful this concept is, so in this article I will discuss with you this concept and how powerful this concept is. I may have used this concept incorrectly in some places, but this time you will learn this concept, and by the end of this article, you will be very good friends with Subjects. 🙂

If, like me, you think of Subjects as a combination of an Observer and an Observable, try to forget the concept. Now I’m going to modify the Observable and Observer concepts. For Observables I would recommend that you read sentence 5 between Rx Observables and the developer (me) and Observer I would recommend that you continue reading Rx Observable proposal Observer conversation between An Observable and the developer (me) Then you can easily understand this article. I’m going to share some of the Obsevable and Observer apis with you below.

This is the Observable code, as shown in the figure, with over 3000 lines of code. As we know, an Observable usually uses different methods to convert data into a stream. Here’s a simple example.

public static void main(String[] args) {
    List<String> list = Arrays.asList("Hafiz"."Waleed"."Hussain");
    Observable<String> stringObservable = Observable.fromIterable(list);
}
Copy the code

Next we need the Observer to get the data from the Observable. Now I’m going to show you some of Obsever’s apis for the first time.

As we have seen, the Observer is very simple, with only four methods, so now it’s time to use this Observer in our example.

/**
 * Created by waleed on 09/07/2017.
 */
public class Subjects {

    public static void main(String[] args) {
        List<String> list = Arrays.asList("Hafiz"."Waleed"."Hussain");
        Observable<String> stringObservable = Observable.fromIterable(list);

        Observer<String> stringObserver = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete"); }}; stringObservable.subscribe(stringObserver); }}Copy the code

Its output is simple. Now we have successfully modified Observable and Observer API’s, which basically calls our Observer API’s when subscribing. Observaer’s onNext (data) method is called whenever an Observable wants to provide data. Observable calls Observer onError(e) whenever an error occurs. The Observer onComplete() method is called whenever the stream operation completes Observable. This is a simple relationship between the two apis.

Now I’m going to start today’s topic, and if you’re confused about Observables and Observers again, please try reading the article I mentioned above, or ask questions in the comments. I think the definition of Subjects in Rx will be discussed at the end. Now I will explain to you a simpler example, which will enable us to grasp the concept of Subjects in Rx more directly.

Observable<String> stringObservable = Observable.create(observableEmitter -> {
    observableEmitter.onNext("Event");
});
Copy the code

This is an Observable that emits a string.

Consumer<String> consumer = new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); }};Copy the code

This is a consumer that will subscribe to an Observable.

while (true) {
    Thread.sleep(1000);
    stringObservable.subscribe(consumer);
}
Copy the code

This code will generate an event after every second. I have posted the complete code code for easy reading.

public class Subjects {

    public static void main(String[] args) throws InterruptedException {

        Observable<String> stringObservable = Observable.create(observableEmitter -> {
            observableEmitter.onNext("Event"); }); Consumer<String> consumer = new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); }};while (true) { Thread.sleep(1000); stringObservable.subscribe(consumer); }}}Copy the code

Output: Event Event Event Event

This is a simple example, and I don’t think it’s necessary to explain it too much, but now the interesting part is that I’ll use a different technique to write a new example that will have the same output. Before diving in, try the following code.

class ObservableObserver extends Observable<String> implements Observer<String>.
Copy the code

This is simple. I created a new class named ObservableObserver that inherits from Observable and implements the Observer interface. So that means it can be an Observable enhancement and an Observer. I don’t think there’s any question about that, so we already know that an Observable always generates streams, so this class has that capability as well, because it inherits from an Observable. Then we know that the Observer can observe any flow in an Observable by subscribing to it, so our new class can do the same because it implements the Observer interface, BOOM. Very simple. Now I’m going to show you the entire code, and just because the code is meant to explain the concept doesn’t mean it’s mature code.

class ObservableObserver extends Observable<String> implements Observer<String> {

    private Observer<? super String> observer;

    @Override
    protected void subscribeActual(Observer<? super String> observer) { // Observable abstract method
        this.observer = observer;
    }

    @Override
    public void onSubscribe(Disposable disposable) { //Observer API
        if(observer ! = null) { observer.onSubscribe(disposable); } } @Override public void onNext(String s) {//Observer APIif(observer ! = null) { observer.onNext(s); } } @Override public void onError(Throwable throwable) {//Observer APIif(observer ! = null) { observer.onError(throwable); } } @Override public voidonComplete() {//Observer API
        if(observer ! = null) { observer.onComplete(); } } public Observable<String>getObservable() {
        returnthis; }}Copy the code

Here’s another very simple class. We’ve already used all the methods above, except that we use the related Observable and Observer methods in the same class.

public static void main(String[] args) throws InterruptedException {

    ObservableObserver observableObserver = new ObservableObserver();
    observableObserver.getObservable().subscribe(System.out::println);

    while (true) {
        Thread.sleep(1000);
        observableObserver.onNext("Event"); }}Copy the code

Output: Event Event Event

Two lines in the code above is very important, I’m going to explain to you: * * observableObserver getObservable () : * * here, I get observables from ObservableObserver class and subscribe to the Observer. * * ObservableObserver onNext (” Event “) : ** Here, the Observer API method is called when an event occurs. As a self-closed loop class, I can benefit from the class that is both an Observabel and an Observer. Now for a surprise, you have mastered the concept of Subjects. If you don’t believe me, take a look at the code in the picture below:

This is the code for the RxJava2 Subject class. Now you can see why Subjiects are said to be both observables and observers because it uses both API methods. Different types of Subjects are now available in RxJava, which we will discuss next.

There are four types of Subjiects you can get in RxJava. 1. Publish Subject 2. Behaviour Subject 3. Replay Subject 4. Async Subject

    public static void main(String[] args) throws InterruptedException {

        Subject<String> subject = PublishSubject.create();
//        Subject<String> subject = BehaviorSubject.create();
//        Subject<String> subject = ReplaySubject.create();
//        Subject<String> subject = AsyncSubject.create(); I will explain in the end

        subject.subscribe(System.out::println);

        int eventCounter = 0;
        while (true) {
            Thread.sleep(100);
            subject.onNext("Event "+ (++eventCounter)); }}Copy the code

Output: Event 1 Event 2 Event 3 Event 4 Event 5 Event 6 Event 7 Event 8 Event 9 Event 10

In general, if you run the code above, you will see that the output of all Subjects except AsyncSubject is the same. Now it is time to distinguish the types of Subjects. Publish Subject: ** In this type of Subject, we can get real-time data, for example, I have a Publish Subject that gets sensor data, so now I subscribe to the Subject, I get the latest value, as shown in the following example:

public static void main(String[] args) throws InterruptedException {

    Subject<String> subject = PublishSubject.create();
    int eventCounter = 0;
    while (true) {
        Thread.sleep(100);
        subject.onNext("Event " + (++eventCounter));

        if(eventCounter == 10) subject.subscribe(System.out::println); }}Copy the code

Output: Event 11 Event 12 Event 13 Event 14 Event 15 Event 16

So, in this case, the Publish subject publishes data from 0, and at subscription time it publishes data to 10, and as you can see, the output is Event 11.

** In this type of Subjects, we will get the last value published by the Subject and the new value to be published. For simplicity, read the code below.

public static void main(String[] args) throws InterruptedException { Subject<String> subject = BehaviorSubject.create();  int eventCounter = 0;while (true) {
        Thread.sleep(100);
        subject.onNext("Event " + (++eventCounter));

        if(eventCounter == 10) subject.subscribe(System.out::println); }}Copy the code

Output: Event 10 Event 11 Event 12 Event 13 Event 14 Event 15

As you can see in the output, I also got the “Event 10” value, which was published before I subscribed. This means that I can use this type of Subject if I want to subscribe to the last value before.

** In this type of Subject, I can get all the published data values without even thinking about it when I subscribe. For simplicity’s sake, I’ll just add the code.

public static void main(String[] args) throws InterruptedException {

    Subject<String> subject = ReplaySubject.create();
    int eventCounter = 0;
    while (true) {
        Thread.sleep(100);
        subject.onNext("Event " + (++eventCounter));

        if(eventCounter == 10) subject.subscribe(System.out::println); }}Copy the code

Output: Event 1 Event 2 Event 3 Event 4 Event 5 Event 6 Event 7 Event 8 Event 9 Event 10 Event 11 Event 12

Now I subscribe again at Event 10, but I can get all the historical data, so it’s easy.

**4. Async Subject: ** In this type of Subject, we get the last published data value that the Subject sends before it completes and terminates.

public static void main(String[] args) throws InterruptedException {

    Subject<String> subject = AsyncSubject.create();
    subject.subscribe(System.out::println);
    int eventCounter = 0;
    while (true) {
        Thread.sleep(100);
        subject.onNext("Event " + (++eventCounter));

        if (eventCounter == 10) {
            subject.onComplete();
            break; }}}Copy the code

Output: Event 10 Process finished with exit code 0

Here, you can see that I end the Subject with the completion flag at value 10 and I get the Event 10 output after the program completes and before it exits, So that means what it means is that anytime I want to get the last published data value through Subject I can use Async Subject.

To repeat: Publish Subject: I don’t care about the previous Publish history, I only care about the new or latest value. Behaviour Subject: I care about the last value published by this Subject and the new value. Replay Subject: I care about all historical data with new values published. Async Subject: I only care about the last value emitted by the Subject before completion or termination.

Conclusion: Hello friends, I hope you are clear about this point, and try your best to put these concepts into practice. Now, I’d like to say goodbye to you and wish you all a good weekend. 🙂


The Nuggets Translation Project is a community that translates quality Internet technical articles from English sharing articles on nuggets. The content covers Android, iOS, front-end, back-end, blockchain, products, design, artificial intelligence and other fields. If you want to see more high-quality translation, please continue to pay attention to the Translation plan of Digging Gold, the official Weibo, Zhihu column.