Preface Rx series of articles
- RxJava met
- RxJava overview
- RxJava operator
- Observables create
- The Observer’s creation
- RxJava various ways to unsubscribe
New to the Observer to create
Observable.just("a").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}@Override
public void onNext(String s) {}@Override
public void onError(Throwable e) {}@Override
public void onComplete(a) {}});Copy the code
new Consumer
Observable.just("a").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {}});Copy the code
new Subscriber
Support the back pressure
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
}
}, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {}@Override
public void onNext(String s) {}@Override
public void onError(Throwable t) {}@Override
public void onComplete(a) {}});Copy the code
new FlowableSubscriber
The Subscriber added after 2.2 is different from 1.0
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 1000000; i++) {
emitter.onNext(""+i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10);
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable t) {}@Override
public void onComplete(a) {}});Copy the code