Basic API supplement
1. Incomplete call() method in Action1
Observable<String> observable = Observable.just("hello"."world!");
// The way we wrote it before
// observable.subscribe(new Observer<String>() {
//
// @Override
// public void onCompleted() {
//
// }
//
// @Override
// public void onError(Throwable e) {
//
// }
//
// @Override
// public void onNext(String t) {
// log. I ("main", "value :" + t);
// }
// });
observable.subscribe(new Action1<String>() {
/** * is equivalent to onNext */
@Override
public void call(String t) {
Log.e("main"."Values."+ t); }});// observable.subscribe(onNext, onError)
// observable.subscribe(onNext, onError, onCompleted);
Copy the code
Result output:
08-07 02:46:32. 001, 4533-4533 / com haocai. Architect. Rxjava E/main: Value: hello 08-07 02:46:32. 001. 4533-4533 / com haocai. Architect. Rxjava E/main: value: world!Copy the code
Call () is equivalent to the onNext method
2. Filter function
(1) filter
Filter (Func1) is used to filter the values we do not want in the observation sequence and only returns those values that meet the condition. Let’s look at the schematic diagram:
public class FilterActivity extends Activity {
private Observable<AppInfo> observable;
private AppInfoAdapter appInfoAdapter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
initView();
}
private void initView(a) {
ListView listView = (ListView) findViewById(R.id.lv_app_name);
appInfoAdapter = new AppInfoAdapter(this);
listView.setAdapter(appInfoAdapter);
}
/** * Create Observable **@return* /
private Observable<AppInfo> getApps(a) {
AppInfo appInfo1 = new AppInfo("Xiong".0);
AppInfo appInfo2 = new AppInfo("Tony".0);
AppInfo appInfo3 = new AppInfo("Tomcat".0);
AppInfo appInfo4 = new AppInfo("Lucy".0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer".0);
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).filter(
new Func1<AppInfo, Boolean>() {
@Override
public Boolean call(AppInfo t) {
return t.getName().contains("Lucy"); }}); }public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted(a) {
// Refresh the UI when done
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
// Add dataappInfoAdapter.addAppInfo(t); }}); }}Copy the code
Result output:
08-07 03:19:10. 492, 32521-32521 / com haocai. Architect. Rxjava E/main: Lucy 08-07 03:19:10. 492. 32521-32521 / com haocai. Architect. Rxjava E/main: Lucy pioneerCopy the code
Filter source code:
public final class OnSubscribeFilter<T> implements OnSubscribe<T> {
final Observable<T> source;
final Func1<? super T, Boolean> predicate;
public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
this.source = source;
this.predicate = predicate;
}
@Override
public void call(final Subscriber<? super T> child) {
FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
child.add(parent);
source.unsafeSubscribe(parent);
}
static final class FilterSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> actual;
final Func1<? super T, Boolean> predicate;
boolean done;
public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
this.actual = actual;
this.predicate = predicate;
request(0);
}
@Override
public void onNext(T t) {
boolean result;
try {
result = predicate.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
if (result) {
actual.onNext(t);
} else {
request(1); }}@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted(a) {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
super.setProducer(p); actual.setProducer(p); }}}Copy the code
(2) take(get the first few digits or specified range)
/** * Create Observable **@return* /
private Observable<AppInfo> getApps(a) {
AppInfo appInfo1 = new AppInfo("Xiong".0);
AppInfo appInfo2 = new AppInfo("Tony".0);
AppInfo appInfo3 = new AppInfo("Tomcat".0);
AppInfo appInfo4 = new AppInfo("Lucy".0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer".0);
// Get the first two pieces of current data
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).take(2);
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted(a) {
// Refresh the UI when done
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
// Add dataappInfoAdapter.addAppInfo(t); }}); }Copy the code
Result output:
08-07 05:54:16. 887, 12684-12684 / com haocai. Architect. Rxjava E/main: Xiong 08-07 05:54:16. 887, 12684-12684 / com haocai. Architect. Rxjava E/main: TonyCopy the code
Take source code
public final class OperatorTake<T> implements Operator<T.T> {
final int limit;
public OperatorTake(int limit) {
if (limit < 0) {
throw new IllegalArgumentException("limit >= 0 required but it was " + limit);
}
this.limit = limit;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> parent = new Subscriber<T>() {
int count;
boolean completed;
@Override
public void onCompleted(a) {
if(! completed) { completed =true; child.onCompleted(); }}@Override
public void onError(Throwable e) {
if(! completed) { completed =true;
try {
child.onError(e);
} finally{ unsubscribe(); }}}@Override
public void onNext(T i) {
if(! isUnsubscribed() && count++ < limit) {boolean stop = count == limit;
child.onNext(i);
if(stop && ! completed) { completed =true;
try {
child.onCompleted();
} finally{ unsubscribe(); }}}}/** * We want to adjust the requested values based on the `take` count. */
@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
// keeps track of requests up to maximum of `limit`
final AtomicLong requested = new AtomicLong(0);
@Override
public void request(long n) {
if (n > 0 && !completed) {
// because requests may happen concurrently use a CAS loop to
// ensure we only request as much as needed, no more no less
while (true) {
long r = requested.get();
long c = Math.min(n, limit - r);
if (c == 0) {
break;
} else if (requested.compareAndSet(r, r + c)) {
producer.request(c);
break; }}}}}); }};if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}
/* * We decouple the parent and child subscription so there can be multiple take() in a chain such as for * the groupBy Observer use case where you may take(1) on groups and take(20) on the children. * * Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM. * * However, if we receive an unsubscribe from the child we still want to propagate it upwards so we * register 'parent' with 'child' * /
child.add(parent);
returnparent; }}Copy the code
(3) takeLast (get the last few digits)
/** * Create Observable **@return* /
private Observable<AppInfo> getApps(a) {
AppInfo appInfo1 = new AppInfo("Xiong".0);
AppInfo appInfo2 = new AppInfo("Tony".0);
AppInfo appInfo3 = new AppInfo("Tomcat".0);
AppInfo appInfo4 = new AppInfo("Lucy".0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer".0);
// Get the first two pieces of current data
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).takeLast(2);
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted(a) {
// Refresh the UI when done
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
// Add dataappInfoAdapter.addAppInfo(t); }}); }Copy the code
Result output:
08-07 06:16:08. 483, 32192-32192 / com haocai. Architect. Rxjava E/main: Lucy 08-07 06:16:08. 483. 32192-32192 / com haocai. Architect. Rxjava E/main: Lucy pioneerCopy the code
(4) Distinct
private Observable<String> getApps(a) {
// Get the first two pieces of current data
return Observable.just("Tony"."pioneer"."Tomcat"."Tony"."Lucy"."Tomcat"."Tony").distinct();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {
// Refresh the UI when done
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(String t) {
Log.e("main",t); }}); }Copy the code
Result output:
08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: Tony 08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: Pioneer 08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: Tomcat 08-07 06:47:40. 045, 28387-28387 / com haocai. Architect. Rxjava E/main: LucyCopy the code
(5) distinctUntilChanged(delete adjacent duplicate data of position)
/** * Create Observable **@return* /
private Observable<String> getApps(a) {
list = new ArrayList<String>();
list.add("Michael");
list.add("Michael");
list.add("pioneer");
list.add("Michael");
list.add("Michael");
list.add("Huni");
list.add("Huni");
list.add("Huni");
list.add("King");
list.add("Huni");
return Observable.from(list).distinctUntilChanged();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {}@Override
public void onError(Throwable e) {}@Override
public void onNext(String t) {
Log.e("main"."Filtered value:"+ t); }}); }Copy the code
Result output:
08-07 07:46:45. 444, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Michael 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Pioneer 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Michael 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: Huni 08-07 07:46:45. 445, 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: King 08-07 07:46:45. 445. 17378-17378 / com haocai. Architect. Rxjava E/main: the filtered value: HuniCopy the code
As the name suggests, an Observable sends only the First data item in an Observable sequence.
private Observable<String> getApps(a) {
list = new ArrayList<String>();
list.add("Michael");
list.add("pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
// first: send the first value in the sequence (internal call take(1).single()))
// last: send last (takeLast(1).single()))
return Observable.from(list).first();
// return Observable.from(list).last();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {}@Override
public void onError(Throwable e) {}@Override
public void onNext(String t) {
Log.e("main"."Filtered value:"+ t); }}); }Copy the code
Result output:
08-07 08:30:57. 648, 25076-25076 / com haocai. Architect. Rxjava E/main: the filtered value: MichaelCopy the code
(7) Last
Last () emits only the last data item in the observation sequence.
private Observable<String> getApps(a) {
list = new ArrayList<String>();
list.add("Michael");
list.add("pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
// first: send the first value in the sequence (internal call take(1).single()))
// last: send last (takeLast(1).single()))
return Observable.from(list).last();
}
Copy the code
Result output:
08-07 08:30:57. 648, 25076-25076 / com haocai. Architect. Rxjava E/main: the filtered value: cookiesCopy the code
Skip (int) allows us to ignore the first n items emitted by an Observable.
/** * Create Observable **@return* /
private Observable<String> getApps(a) {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// Skip: Start at the beginning, and then send
// skipLast: How many of the last ones I don't need
return Observable.from(list).skip(2);
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted(a) {}@Override
public void onError(Throwable e) {}@Override
public void onNext(String t) {
Log.i("main"."Filtered value:"+ t); }}); }Copy the code
08-07 08:51:58. 061, 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: Huni 08-07 08:51:58. 061, 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: King 08-07 08:51:58. 061. 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: Cookie 08-07 08:51:58. 061. 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: Faker 08-07 08:51:58. 061, 12238-12238 / com haocai. Architect. Rxjava I/main: the filtered value: GigiCopy the code
(9)SkipLast
SkipLast (int) ignores the last n items emitted by an Observable.
/** * Create Observable **@return* /
private Observable<String> getApps(a) {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// Skip: Start at the beginning, and then send
// skipLast: How many of the last ones I don't need
return Observable.from(list).skipLast(2);
}
Copy the code
Result output:
08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: Michael 08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: Pioneer 08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: Huni 08-07 08:50:01. 719, 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: King 08-07 08:50:01. 719. 10295-10295 / com haocai. Architect. Rxjava I/main: the filtered value: cookiesCopy the code
(10)SkipLast elementAt(int) is used to get the NTH item in the event sequence emitted by the element Observable and send it as the unique data.
private Observable<String> getApps(a) {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// Skip: Start at the beginning, and then send
// skipLast: How many of the last ones I don't need
return Observable.from(list).elementAt(2);
}
Copy the code
08-07 09:01:17. 495, 20645-20645 / com haocai. Architect. Rxjava I/main: the filtered value: HuniCopy the code
The Sample operator periodically scans the results generated by the source Observable and takes samples within a specified interval
Get the latest data from the regularly launched Observable
Patients with a
observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(
new Observer<Long>() {
@Override
public void onCompleted(a) {}@Override
public void onError(Throwable e) {}@Override
public void onNext(Long t) {
Log.i("main"."Received value:"+ t); }});Copy the code
08-07 09:36:08. 478, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 0 08-07 09:36:10. 477, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 2 08-07 09:36:12. 478, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 4 08-07 09:36:14. 479, 20117-20195 / com haocai. Architect. Rxjava I/main: receives the value: 6 08-07 09:36:16. 478, 20117-20195 / com. Haocai. Architect. Rxjava I/main: receives the value: eight...Copy the code
Example 2
Observable.create(subscriber -> {
subscriber.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}).sample(999, TimeUnit.MILLISECONDS)// or throttleLast(1000, timeunit.milliseconds)
.subscribe(item-> Log.d("JG",item.toString()));
Copy the code
// result is 2,3,5Copy the code
(12)Timeout Timeout: If the original Observable emits no data after a specified period of time, it emits an exception or uses an alternate Observable.
private Observable<String> getApps(a) {
observable = Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
observer.onNext("Kpioneer");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
observer.onNext("Lucy"); observer.onCompleted(); }});return observable;
}
public void click(View v) {
observable.timeout(999, TimeUnit.MILLISECONDS,Observable.just("Michel"."QQ")).subscribe(
new Observer<String>() {
@Override
public void onCompleted(a) {}@Override
public void onError(Throwable e) {}@Override
public void onNext(String t) {
Log.i("main"."Received value:"+ t); }}); }Copy the code
Result output:
08-07 10:02:30. 806, 11757-11757 / com haocai. Architect. Rxjava I/main: receives the value: Kpioneer 08-07 10:02:31. 808, 11757-11824 / com haocai. Architect. Rxjava I/main: receives the value: Michel 08-07 10:02:31. 808, 11757-11824 / com haocai. Architect. Rxjava I/main: receives the value: QQCopy the code
If no alternate Observable is specified, the result is Kpioneer, onError
3. Transform operations
The Map () function takes a parameter of type Func1 (as in Map (Func1<? super T, ? Extends R> func)), and then apply Func1 to each value emitted by an Observable, converting the emitted value to the desired value. I’m sure you don’t understand this definition either, but let’s take a look at the official schematic:
userModelList = new ArrayList<UserModel>();
for (int i = 0; i < 3; i++) {
UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
List<OrderModel> orderList = new ArrayList<OrderModel>();
for (int j = 0; j < 2; j++) {
OrderModel orderModel = new OrderModel("userId_" + i
+ "_orderId_" + j, "user_" + i + "_orderName_" + j);
orderList.add(orderModel);
}
userModel.setOrderList(orderList);
userModelList.add(userModel);
}
Observable.from(userModelList).map(new Func1<UserModel, String>() {
@Override
public String call(UserModel userModel) {
return userModel.getUserName();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("main"."Converted value :"+s); }});Copy the code
08-07 11:39:51. 493, 2499-2499 / com haocai. Architect. Rxjava I/main: After the conversion value: userName_0 08-07 11:39:51. 493, 2499-2499 / com. Haocai. Architect. Rxjava I/main: After the conversion value: userName_1 08-07 11:39:51. 493, 2499-2499 / com. Haocai. Architect. Rxjava I/main: after conversion value: userName_2Copy the code
Flatmap () works like this:
1. Replace the incoming event object with an Observable. 2. Instead of sending the Observable directly, it activates the Observable to start sending events by itself; 3. Events sent by each created Observable are merged into the same Observable, which is responsible for transferring these events to the Subscriber callback method. These three steps split the event into two levels, “flattening” the original object through a set of newly created Observables and distributing it down a unified path. This “flat” is what a flatMap() calls a flat.
Finally, let’s look at the schematic of flatMap:
As you must have noticed from the previous examples, flatMap() and map() both convert the parameters passed in and return another object. But unlike map(), flatMap() returns an Observable, which is not sent directly to the Subscriber callback method.
userModelList = new ArrayList<UserModel>();
for (int i = 0; i < 3; i++) {
UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
List<OrderModel> orderList = new ArrayList<OrderModel>();
for (int j = 0; j < 2; j++) {
OrderModel orderModel = new OrderModel("userId_" + i
+ "_orderId_" + j, "user_" + i + "_orderName_" + j);
orderList.add(orderModel);
}
userModel.setOrderList(orderList);
userModelList.add(userModel);
}
// FlatMap provides this solution (tradeoff)
// Scenario: solve the problem of interface nesting (for example, login scenario after successful authentication)
Observable.from(userModelList).flatMap(new Func1<UserModel, Observable<OrderModel>>() {
@Override
public Observable<OrderModel> call(UserModel userModel) {
return Observable.from(userModel.getOrderList());
}
}).subscribe(new Action1<OrderModel>() {
@Override
public void call(OrderModel orderModel) {
Log.i("main"."Converted value :"+orderModel.getOrderId()); }});Copy the code