preface

In the last article we looked at conversion class operators. In this article we will look at RxJava filter class operators together. Filter operators are mainly used to filter and filter event data, and only return the data that meets the conditions. Let’s take a look at what there are.

Filter operator

Filter

The filter operator, which filters unwanted data from a sequence according to certain constraints, returns only data that meets the criteria to the observer.

// Combined with flatmap, filter out houses with house size greater than 120 square meters in each community
Observable.from(communities)
        .flatMap(new Func1<Community, Observable<House>>() {
            @Override
            public Observable<House> call(Community community) {
                return Observable.from(community.getHouses());
            }
        })
        .filter(new Func1<House, Boolean>() {
            @Override
            public Boolean call(House house) {
                return house.getSize() > 120f;
            }
        })
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                Log.e("rx_test"."Filter: house greater than 120 square:" + house.getCommunityName() + "Small area, size:"+ house.getSize()); }});Copy the code

As we can see from the code, we need to new a Func1 object to filter(), Func1

() the first is the type of data passed in by the observation sequence, and the second is a Boolean object that returns whether or not to filter. Return true if filter() is satisfied, false otherwise. And sends the data that returns true to the observer. Output result:
,>

The filter.120Flat House: Dongfang Garden, Size:144.8The filter.120Flat House: Dongfang Garden, Size:144.8The filter.120Flat House: Springtime, Madrid, Size:123.4The filter.120Flat House: Springtime, Madrid, Size:123.4The filter.120Flat house: Emgrand Home Community, size:188.7The filter.120Flat house: Emgrand Home Community, size:188.7The filter.120Flat house: Emgrand Home Community, size:188.7Copy the code

Schematic diagram:

In actual project development, the filter operator can be used to filter null values in data sets for convenience.

Take

The take(int count) operator can be used to capture and fire the first count elements of an observation sequence.

//take: get the first two cell names
Observable.from(communities)
        .take(2)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."Take: the first two neighborhoods:"+ community.getCommunityName()); }});Copy the code

Output result:

The Oriental Gardens take Madrid SpringCopy the code

Schematic diagram:

TakeLast

The takeLast(int count) operator, as its name implies, intercepts the last count elements in the observation sequence and fires.

//takeLast: get the last two cell names
Observable.from(communities)
        .takeLast(2)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."TakeLast: The last two communities:"+ community.getCommunityName()); }});Copy the code

Output result:

TakeLast: Last two neighborhoods: Madrid Spring takeLast: last two neighborhoods: Emgrand HomeCopy the code

Schematic diagram:

TakeUntil

The takeUntil operator has two types of input arguments. 1. TakeUntil (Observable) subscribe and start launching the original Observable while monitoring the second Observable we provide. If the second Observable emits data or emits a termination notification, the Observable returned by takeUntil() stops emitting the original Observable and terminates.

//observableA emits a Long increment every 300ms
//observableB emits a Long increment every 800ms
Observable<Long> observableA = Observable.interval(300, TimeUnit.MILLISECONDS);
Observable<Long> observableB = Observable.interval(800, TimeUnit.MILLISECONDS);
observableA.takeUntil(observableB)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted(a) {
                Log.e("rx_test"."TakeUntil (observables) :" + "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("rx_test"."TakeUntil (Observable) : onError:" + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                Log.e("rx_test"."TakeUntil (Observable) : onNext:"+ aLong); }});Copy the code

Output result:

TakeUntil (observables) : onNext:0TakeUntil (observables) : onNext:1TakeUntil (observables) : onCompletedCopy the code

As you can see from the output, after subscribing, observableA fires 0,1 and then the onCompleted flag stops. This is because observableA fires every 300ms. After 1 is fired, 600ms has elapsed, and observableB starts to emit data at 800ms. TakeUntil takes effect interrupts observableA’s firing. Schematic diagram:

2. TakeUntil (Func1) judge whether to suspend transmitting data by the call() method in Func1 passed in.

//takeUntil: Filters with flatMap until prices are greater than 500 and interrupts the current Observable launching House
Observable.from(communities)
        .flatMap(new Func1<Community, Observable<House>>() {
            @Override
            public Observable<House> call(Community community) {
                return Observable.from(community.getHouses());
            }
        })
        .takeUntil(new Func1<House, Boolean>() {
            @Override
            public Boolean call(House house) {
                return house.getPrice() > 500;
            }
        })
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                Log.e("rx_test"."TakeUntil: stop launching when greater than 500:" + house.getCommunityName() + "Neighborhood, house price:"+ house.getPrice()); }});Copy the code

Output result:

TakeUntil (Func1) : more than500Dongfang Garden Housing Estate, Room rates:200TakeUntil (Func1) : more than500Dongfang Garden Housing Estate, Room rates:520Copy the code

Schematic diagram:

TakeWhile

The takeWhile operator is similar to takeUntil(Func1), except that takeWhile() terminates Observable emission when the data emitted by the Observable does not meet the criteria.

//takeWhile: Abort launch when launch data equals 3
Observable.just(1.2.3.4.5)
        .takeWhile(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                returninteger ! =3;
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e("rx_test"."takeWhile:"+ integer); }});Copy the code

Output result:

TakeWhile:1TakeWhile:2Copy the code

Schematic diagram:

Skip

Skip (int count) operator to ignore the first count item of the launch observation sequence.

// Ignore the first two cells
Observable.from(communities)
        .skip(2)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."Skip: ignore the first two cells:"+ community.getCommunityName()); }});Copy the code

Output result:

Skip: The first two communities: Emgrand HomesCopy the code

Schematic diagram:

SkipLast

SkipLast (int count) operator that ignores the last count item of the launch observation sequence.

// Ignore the last two cell data
Observable.from(communities)
        .skipLast(2)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."Skip: Ignore the last two cells:"+ community.getCommunityName()); }});Copy the code

Output result:

Ignore the last two neighborhoods: Oriental GardenCopy the code

Schematic diagram:

SkipUntil

The skipUntil operator, as opposed to takeUntil(). Subscribe and start launching the original Observable, while monitoring the second Observable we provide. If the second Observable emits a data item or a termination notification, the Observable returned by skipUntil() does not emit data until the previous item is ignored. Schematic diagram:

SkipWhile

The skipWhile operator, in contrast to takeWhile, starts emitting data when the Observable emits data that does not meet the criteria, ignoring previous data items. Schematic diagram:

Debounce

The debounce operator has two types of input arguments. 1. Debounce (long, TimeUnit) filters data that is transmitted too quickly by an Observable, limiting traffic. The first parameter is the current limiting time, and the second parameter is the time unit.

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        try {
            for (int i = 1; i < 10; i++) {
                subscriber.onNext(i);
                Thread.sleep(i * 100); // delay 100,200,300,400,500...... 900ms transmitting data
            }
            subscriber.onCompleted();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).subscribeOn(Schedulers.newThread())
        .debounce(400, TimeUnit.MILLISECONDS)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted(a) {
                Log.e("rx_test"."debounce:" + "onCompleted");
            }

            @Override
            public void onError(Throwable e) {}@Override
            public void onNext(Integer integer) {
                Log.e("rx_test"."debounce:"+ integer); }});Copy the code

Output result:

Debounce:5Debounce:6Debounce:7Debounce:8Debounce:9Debounce: onCompletedCopy the code

It can be seen from the output result that since the current limiting time is set at 500ms, 1-4 is not transmitted but filtered. Note that if the last result produced by the source Observable calls onCompleted within the time limit, the debounce operator also delivers the result to the subscriber. Schematic diagram:

2. Debounce (Func1) filters according to the function in the call method of Func1. The call method in Func1 returns a temporary Observable, and if the original Observable sends a new data before the temporary Observable generated by Func1’s call method is finished, the data is filtered out. Schematic diagram:

Distinct

1. Distinct () only allows data that have not been transmitted to pass through, thus achieving the function of removing duplicate items in the sequence.

// Remove duplicate numbers
Observable.just(1.2.2.3.4.5.6.6.6.7)
        .distinct()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e("rx_test"."Distinct:"+ integer); }});Copy the code

Output result:

-Blair: Distinct:1-Blair: Distinct:2-Blair: Distinct:3-Blair: Distinct:4-Blair: Distinct:5-Blair: Distinct:6-Blair: Distinct:7Copy the code

It can be seen from the output that duplicate 2 and 6 are filtered. Schematic diagram:

Distinct (Func1) performs deduplication according to the call method in Func1. The call method generates a Key according to the value emitted by Observable and compares the Key to judge whether the two data are the same. If it is a duplicate, duplicate items are filtered like distinct().

// Remove the houses of the same size in each community according to a certain attribute
Observable.from(communities)
        .flatMap(new Func1<Community, Observable<House>>() {
            @Override
            public Observable<House> call(Community community) {
                return Observable.from(community.getHouses());
            }
        })
        .distinct(new Func1<House, Float>() {
            @Override
            public Float call(House house) {
                return house.getSize();
            }
        })
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                Log.e("rx_test"."Distinct (Func1) : + house.getCommunityName() + "Small area, size:"+ house.getSize()); }});Copy the code

Output result:

Distinct (Func1) : Oriental Garden district, size:105.6Distinct (Func1) : Oriental Garden district, size:144.8A distinct(Func1) neighborhood in the Spring of Madrid88.6A distinct(Func1) neighborhood in the Spring of Madrid123.4Distinct (Func1) : The residential area of imperial Palace, size:188.7Distinct (Func1) : The residential area of imperial Palace, size:56.4Copy the code

DistinctUntilChanged

1. DistinctUntilChanged () indicates whether the current data item is the same as the previous item.

// Go ahead and repeat the data
Observable.just(1.2.2.3.4.2.3.5.5)
        .distinctUntilChanged()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e("rx_test"."DistinctUntilChanged: forward reload:"+ integer); }});Copy the code

Output result:

DistinctUntilChanged:1DistinctUntilChanged:2DistinctUntilChanged:3DistinctUntilChanged:4DistinctUntilChanged:2DistinctUntilChanged:3DistinctUntilChanged:5Copy the code

Schematic diagram:

DistinctUntilChanged (Func1) is similar to distinct(Func1) in that the call method in Func1 produces a key to determine whether two adjacent items are the same.

// Remove the houses with the same name in each community by moving forward according to a property
Observable.from(communities)
        .flatMap(new Func1<Community, Observable<House>>() {
            @Override
            public Observable<House> call(Community community) {
                return Observable.from(community.getHouses())
                .distinctUntilChanged(new Func1<House, String>() {
                    @Override
                    public String call(House house) {
                        returnhouse.getCommunityName(); }}); } }) .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                Log.e("rx_test"."DistinctUntilChanged (Func1) : forward to heavy:" + house.getCommunityName() + "Small area, size:"+ house.getSize()); }});Copy the code

Output result:

DistinctUntilChanged (Func1) : Forward105.6DistinctUntilChanged (Func1) : Forward to Heavy: Madrid Spring Neighborhood, size:88.6DistinctUntilChanged (Func1) : Forward to heavy: Emgrand Homeland Community, size:188.7Copy the code

ElementAt

The elementAt(int index) operator gets the index of the index item in the observation sequence and transmits it as the unique data to the observer, starting at 0.

Observable.from(communities)
        .elementAt(1)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."ElementAt: second cell:"+ community.getCommunityName()); }});Copy the code

Output result:

ElementAt MadridCopy the code

Schematic diagram:

First

1. First () emits only the first data item in the observation sequence.

Observable.from(communities)
        .first()
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."first:"+ community.getCommunityName()); }});Copy the code

Output result:

First: Oriental GardenCopy the code

Schematic diagram:

2. First (Func1) According to the condition of call method in Func1, emit the first data item that meets the condition.

// Filter out the first community named Madrid Spring
Observable.from(communities)
        .first(new Func1<Community, Boolean>() {
            @Override
            public Boolean call(Community community) {
                return Madrid Spring.equals(community.getCommunityName());
            }
        })
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."The first (Func1) :"+ community.getCommunityName()); }});Copy the code

Output result:

First (Func1) : Madrid SpringCopy the code

Last

1. Last () emits only the last data item in the observation sequence.

// Send the last data item
Observable.from(communities)
        .last()
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                Log.e("rx_test"."The last."+ community.getCommunityName()); }});Copy the code

Output result:

Last: Emgrand HomeCopy the code

Schematic diagram:

2. Last (Func1) Emits the last data item that meets the condition according to the condition of the call method in Func1.

// Send the last data item that meets the criteria: filter the last housing in the community named Madrid Spring
Observable.from(communities)
        .flatMap(new Func1<Community, Observable<House>>() {
            @Override
            public Observable<House> call(Community community) {
                return Observable.from(community.getHouses());
            }
        })
        .last(new Func1<House, Boolean>() {
            @Override
            public Boolean call(House house) {
                return Madrid Spring.equals(house.getCommunityName());
            }
        })
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                Log.e("rx_test"."The last." + house.getCommunityName() + "Small area, size:"+ house.getSize()); }});Copy the code

Output result:

Last: Springtime Madrid, Size:88.6Copy the code

conclusion

This article concludes with an overview of the common filter class operators in RxJava. In the next article, we will take a look at the combined operators of the four RxJava class operators and how to use them. If you have any doubts or suggestions, you can also put forward them in the project Issues of RxJavaDemo on Github. I will reply in time. Attached is the address of RxJavaDemo: RxJavaDemo