Combination of 3.

Merge the event sequences emitted by two Observables into one event sequence, just like an Observable emits. You can simply interpret it as two Obsrvable merged into one Observable, and the merged data is disordered.


        String[] array1 = {"kpioneer"."Tiger"."Cook"."Zhang"."Haocai"};
        String[] array2 = {"WangWu"."Zhangsan"."Lisi"."Luo"};
        Observable<String> merge = Observable.merge(Observable.from(array1),Observable.from(array2));
        merge.subscribe(new Observer<String>() {
            @Override
            public void onCompleted(a) {}@Override
            public void onError(Throwable e) {}@Override
            public void onNext(String s) {
                Log.e("main"."Output result:+s); }});Copy the code

Result output:

08-08 08:50:12. 517, 3958-3958 / com haocai. Architect. Rxjava E/main: Output: kpioneer 08-08 08:50:12. 517, 3958-3958 / com. Haocai. Architect. Rxjava E/main: Output: Tiger 08-08 08:50:12. 517, 3958-3958 / com. Haocai. Architect. Rxjava E/main: Output: Cook 08-08 08:50:12. 517, 3958-3958 / com. Haocai. Architect. Rxjava E/main: Output: Zhang 08-08 08:50:12. 517, 3958-3958 / com. Haocai. Architect. Rxjava E/main: Output: Haocai 08-08 08:50:12. 517, 3958-3958 / com. Haocai. Architect. Rxjava E/main: Output: WangWu 08-08 08:50:12. 517, 3958-3958 / com. Haocai. Architect. Rxjava E/main: Output: Zhangsan 08-08 08:50:12. 517, 3958-3958 / com. Haocai. Architect. Rxjava E/main: Output: Lisi 08-08 08:50:12. 518, 3958-3958 / com. Haocai. Architect. Rxjava E/main: output: LuoCopy the code

Observable, when an event throws an exception, the following sequence terminates. Note: mergeDelayError The mergeDelayError method can be used if you want to make an error in a sequence that does not affect subsequent sequences

(2) Zip (Observable, Observable, Func2) is used to combine data items emitted by two Observables. According to Func2, a new value is generated and emitted. When one Of the Observables finishes sending data or an exception occurs, the other one stops sending data.

Simply put, the ZIP operator merges multiple streams of data and then emits the final merged data.

Flow chart:


String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"}; String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"}; Observable<String> zip = //1. The merge algorithm decides on its own //2 Observable.zip(Observable.from(array1),Observable.from(array2), new Func2<String, String, String>() {@override public String call(String s, String s2) {return s+"--"+s2; }}); zip.subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable E) {} @override public void onNext(String s) {log. e("main"," output: "+s); }});Copy the code

Result output:

08-08 09:32:46. 856, 10134-10134 / com haocai. Architect. Rxjava E/main: Output: kpioneer -- WangWu 08-08 09:32:46. 856, 10134-10134 / com. Haocai. Architect. Rxjava E/main: Output: Tiger -- Zhangsan 08-08 09:32:46. 856, 10134-10134 / com. Haocai. Architect. Rxjava E/main: Output: Cook -- Lisi 08-08 09:32:46. 856, 10134-10134 / com. Haocai. Architect. Rxjava E/main: output: Zhang - LuoCopy the code

It also finds that Haocai has no output in smallest array units


        String[]  array1 = {"kpioneer"."Tiger"."Cook"."Zhang"."Haocai"};
        String[]  array2 = {"WangWu"."Zhangsan"."Lisi"."Luo"};
        Integer[] array3 = { 100.1000.10000 };

        Observable<String> zip = Observable.zip(Observable.from(array1),Observable.from(array2),Observable.from(array3), new Func3<String, String,Integer, String>() {


            @Override
            public String call(String s, String s2, Integer integer) {
                return s+"-"+s2+"-"+integer; }}); zip.subscribe(new Observer<String>() {
            @Override
            public void onCompleted(a) {}@Override
            public void onError(Throwable e) {}@Override
            public void onNext(String s) {
                Log.e("main"."Output result:+s); }});Copy the code

Result output:

08-08 10:02:06. 519, 4330-4330 / com haocai. Architect. Rxjava E/main: Output: kpioneer WangWu - - - 100-08-08 10:02:06. 519, 4330-4330 / com. Haocai. Architect. Rxjava E/main: Output: Tiger - Zhangsan - 1000-08 10:02:06 08. 520, 4330-4330 / com. Haocai. Architect. Rxjava E/main: output: Cook - Lisi - 10000Copy the code

(3) Join the first two methods, zip() and merge(), are applied in the category of transmitted data. We need to consider time in some scenarios before deciding how to operate values. RxJava’s join() function combines the data emitted by two Observables based on a time window. Join (Observable, Func1, Func1, Func2) lets introduce the four parameters of the join operator:

Observable: The source Observable needs to be a combined Observable. Let’s call it the target Observable. Func1: receives data emitted from the source Observable and returns an Observable whose declared period determines the validity period of the emitted data from the source Obsrvable; Func1: receives the data emitted by the target Observable and returns an Observable. The declared period of this Observable determines the validity period of the data emitted by the target Obsrvable. Func2: Receives data emitted from the source Observable and the target Observable, combines the two data and returns them. So the syntax of the Join operator looks something like this: ObservableB onservableA. Join (observableB, observableA, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB, observableB)


        String[]  array1 = {"A"."B"."C"};
        String[]  array2 = {"1"."2"};

        Observable<String> observable1 = Observable.from(array1);
        Observable<String> observable2 = Observable.from(array2);

        Observable<String> join = observable1.join(observable2, new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(2, TimeUnit.SECONDS); }},new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(2,TimeUnit.SECONDS); }},new Func2<String,String,String>() {
            @Override
            public String call(String o, String o2) {
                return o +"-"+o2; }}); join.subscribe(new Observer<String>() {
            @Override
            public void onCompleted(a) {}@Override
            public void onError(Throwable e) {}@Override
            public void onNext(String s) {
                Log.e("main"."Output result:+s); }});Copy the code

Result output:

08-08 12:21:13. 407, 32015-32015 / com haocai. Architect. Rxjava E/main: Output: A - 1-08 12:21:13 08. 407, 32015-32015 / com. Haocai. Architect. Rxjava E/main: Output: B - 1 08-08 12:21:13. 408, 32015-32015 / com. Haocai. Architect. Rxjava E/main: Output: C - 1 08-08 12:21:13. 408, 32015-32015 / com. Haocai. Architect. Rxjava E/main: Output: A - 2-08 12:21:13 08. 408, 32015-32015 / com. Haocai. Architect. Rxjava E/main: Output: B - 2 08-08 12:21:13. 408, 32015-32015 / com. Haocai. Architect. Rxjava E/main: output: C - 2Copy the code

4. The thread

(1) By default, RxJava follows the thread invariance principle. That is, the thread in which the subscribe() method is called produces events, and the thread in which events are produced consumes events. If threads need to be switched, a Scheduler is used. In RxJava, the Scheduler acts as a thread controller that allows RxJava to specify which thread each piece of code runs in. RxJava has several schedulers built in that are suitable for most usage scenarios:

Schedulers.immediate(): Runs directly on the current thread. This is the default Scheduler. Schedulers.newthread (): new threads are always enabled and perform operations on the newThread. Schedulers.io(): The Scheduler used for I/O operations (read and write to files, read and write to databases, network information interaction, etc.). The behavior of IO () is similar to that of newThread(), except that the internal implementation of IO () uses an unlimited pool of threads and can reuse idle threads, so in most cases IO () is more efficient than newThread(). Don’t put your calculations in IO () to avoid creating unnecessary threads. Schedulers.computation(): The Scheduler used for calculation. This calculation refers to CPU intensive computing, that is, operations that do not limit performance by operations such as I/O, such as graphics computation. This Scheduler uses a fixed thread pool of CPU cores. Don’t focus on COMPUTATION (), or WAIT time for I/O operations will waste CPU. Android dedicated AndroidSchedulers. MainThread (), it specifies the operation will be in Android run the main thread.

With these schedulers in place, threads can be controlled using the subscribeOn() and observeOn() methods.

SubscribeOn (): Specifies the thread on which subscribe() occurs, that is, the thread on which Observable.OnSubscribe is activated. Or the thread of event generation.

ObserveOn (): Specifies Subscriber on which thread to run. Otherwise known as event-consuming threads.

Text narration is always difficult to understand, above code:


Observable.just(1.2.3.4)
    .subscribeOn(Schedulers.io()) // Specify subscribe() to the IO thread
    .observeOn(AndroidSchedulers.mainThread()) // Specify the Subscriber callback to occur on the main thread (i.e. Action1 object)
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:"+ number); }});Copy the code

In the above code, contents 1, 2, 3 and 4 of the created event will be issued in the IO thread due to subscribeOn(schedulers.io ()). As observeOn (AndroidScheculers mainThread ()) of the specified, thus the subscriber digital printing will take place in the main thread. Before this, in fact, the subscribe () write two sentences subscribeOn (Scheduler. IO ()) and observeOn (AndroidSchedulers. MainThread (), the use of the method is very common, It applies to most “background thread fetch, main thread display” program strategy.

For example, an image has been loaded:

Traditional way to load images:


private ImageView iv_image;
    private static String URL_STR ="http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_scheduler);
         iv_image = (ImageView)findViewById(R.id.iv_image);
        loadImage();
    }
    // The traditional way: download images
    // Thread+Handler
    // AsyncTask

    private Bitmap download(String urlString) {
        try {
            URL url = new URL(urlString);
            HttpURLConnection connection = (HttpURLConnection) url
                    .openConnection();
            InputStream inputStream = connection.getInputStream();
            return BitmapFactory.decodeStream(inputStream);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private void loadImage(a){
        new DownloadTask().execute();
    }
    class DownloadTask extends AsyncTask<Void.Void.Bitmap>{

        @Override
        protected Bitmap doInBackground(Void... params) {
            return download(URL_STR);
        }

        @Override
        protected void onPostExecute(Bitmap bitmap) {
            super.onPostExecute(bitmap); iv_image.setImageBitmap(bitmap); }}Copy the code

RxJava:


   private static String URL_STR = "http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";
   private ImageView iv_image;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_scheduler);
        iv_image = (ImageView) findViewById(R.id.iv_image);
        rxJavaLoadImage();
    }
    //RxJava implementation download
    private void rxJavaLoadImage(a) {
        // The Url is converted to a Bitmap
        // We need to specify the thread on which these events are executed
        Observable.just(URL_STR).map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String s) {
                return download(s);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())// The RxAndroid library needs to be imported
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {
                        / / update the UIiv_image.setImageBitmap(bitmap); }}); }Copy the code

Schedulers.io(): The Scheduler used for I/O operations (read and write to files, read and write to databases, network information interaction, etc.).

Note: 1. In RxJava, the entire process is divided into the production and consumption of events. 2. Observable, Observer, Subscriber, and Subjects. Observables and Subjects are two “producing” entities, and Observers and Subscribers are two “consuming” entities

What is the difference between subscribeOn and observeOn thread control? 1. SubscribeOn: specifies the thread that produces the event. ObserveOn: specifies the thread that consumes the event

2. SubscribeOn: execute sequence sequentially and only apply to sequence after observeOn

Note: subscribeOn can be called (executed) multiple times in a sequence, but only if (called before the production sequence, we need to initialize some UI before accessing the network)

Examples prove:


   //RxJava implementation download
    private void rxJavaLoadImage(a) {
        // The Url is converted to a Bitmap
        // We need to specify the thread on which these events are executed
        Observable.just(URL_STR).doOnSubscribe(new Action0() {

            @Override
            public void call(a) {
                Before the production event is executed, the callback method needs to do some initialization work, which can be done in the child thread or the main thread
                // UI updates must be in the main thread
                iv_image.setVisibility(View.VISIBLE);
                // Step 1: initialize
                Log.e("main".Current thread status: + Thread.currentThread().getName());
            }
        }).subscribeOn(AndroidSchedulers.mainThread()).observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String s) {
                        Log.e("main".Current thread status: + Thread.currentThread().getName());
                        return download(s);
                    }
                }).map(new Func1<Bitmap, Bitmap>() {
            @Override
            public Bitmap call(Bitmap t) {
                Log.e("main".Current thread status: + Thread.currentThread().getName());
                // Add watermark, cropping, gray processing and so on (image processing related)......
                // Step 3: Image processing
                return t;
            }
        }).observeOn(AndroidSchedulers.mainThread())// The RxAndroid library needs to be imported
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {
                        Log.e("main".Current thread status: + Thread.currentThread().getName());
                        / / update the UIiv_image.setImageBitmap(bitmap); }}); }Copy the code

Result output:

08-09 06:53:29. 852, 17616-17616 / com haocai. Architect. Rxjava E/main: the current thread state: The main 08-09 06:53:29. 854, 17616-17647 / com. Haocai. Architect. Rxjava E/main: the current thread state: RxIoScheduler - 08-09 06:53:29. 2, 983, 17616-17647 / com. Haocai. Architect. Rxjava E/main: the current thread state: RxIoScheduler - 08-09 06:53:30. 2, 072, 17616-17616 / com. Haocai. Architect. Rxjava E/main: the current thread state: Where main represents the main thread and RxIoScheduler represents the child threadCopy the code