Sources of zero,

Source: Carson_Ho- Jane Books

First, basic knowledge

role role analogy
Observable Generate events The customer
An Observer Receives the event and gives the response action The kitchen
Subscribe Connect the observed with the observer The waiter
Event The carrier of communication between the observed and the observer dishes

Two, basic use

1. Import the connection

Implementation 'IO. Reactivex. Rxjava2: rxjava: 2.2.19' implementation 'IO. Reactivex. Rxjava2: rxandroid: 2.1.1'Copy the code

2. Create the observed

// Create the observed, Public Observable<Integer> createObservable() {Observable<Integer> Observable = Observable. Create (new) public Observable<Integer> createObservable() {Observable<Integer> Observable = Observable ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); }}); return observable; }Copy the code

3. Create an observer

Public Observer<Integer> createObserver() {Observer<Integer> Observer = new Observer<Integer>() {@override Public void onSubscribe(Disposable d) {log. v("lanjiabinRx", "onSubscribe connection "); } @override public void onNext(Integer value) {log. v("lanjiabinRx", "onNext "+ value +" event "); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "onError event "); } @Override public void onComplete() {log.v ("lanjiabinRx", "onComplete event "); }}; return observer; }Copy the code

4. Establish a SUBSCRIBE () connection

Public void createSubscribe() {createObservable().subscribe(createObserver()); }Copy the code

5. Calls and results

 createSubscribe();Copy the code

6. Chain calls

Public void chainCall() {ObservableOnSubscribe (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "OnSubscribe connection"); } @override public void onNext(Integer value) {log. v("lanjiabinRx", "onNext "+ value +" event "); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "onError event "); } @Override public void onComplete() {log.v ("lanjiabinRx", "onComplete event "); }}); }Copy the code

6. Disconnect the connection

That is, the observer cannot continue to receive events from the observed, but the observed can continue to send events

Disposable mDisposable; @override public void subscribe (Disposable d) {mDisposable= Disposable; V ("lanjiabinRx", "onSubscribe join "); //2. } @Override public void onNext(Integer value) { if (value==2) mDisposable.dispose(); //3. Disconnects log. v("lanjiabinRx", "onNext "+ value +" event ") on the second next event; }Copy the code

Create operator

General layout 0.

1. Create (Base send)

The most basic creation

//1.create public void chainCall() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "OnSubscribe connection"); } @override public void onNext(Integer value) {log. v("lanjiabinRx", "onNext "+ value +" event "); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "onError event "); } @Override public void onComplete() {log.v ("lanjiabinRx", "onComplete event "); }}); }Copy the code

2. Send just below 10 immediately

  • Quickly create an Observable
  • Send event features: Directly send incoming events
  • A maximum of ten parameters can be sent
  • Application scenario: Quickly create an Observable and send up to 10 events
//2.just public void justDo() { Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d("lanjiabinRx", "Subscribe "); } @override public void onNext(Integer Integer) {log. d("lanjiabinRx", "Received event onNext =" + Integer); } @override public void onError(Throwable e) {log.d ("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.d ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Results:

3. FromArray (Array send)

  • Quickly create an Observable
  • Send event features: directly send the incoming array data
  • Converts the data in the array into an Observable

Application scenarios: 1. Quickly create an Observable and send more than 10 events (in array form) 2. Array element traversal

//3.fromArray public void fromArrayDo() { Integer[] items = {0, 1, 2, 3, 4}; Observable.fromArray(items).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Results:

4. FromIterable (Collection send)

  • Quickly create an Observable
  • Send event features: Send the incoming collection List data directly
  • Converts the data in the array into an Observable

Application scenarios: 1. Quickly create an Observable and send more than 10 events (in the form of a collection) 2. Collection element traversal

//4.fromIterable public void fromIterableDo(){ List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Results:

Observable1 =Observable.empty() observable1=Observable.empty(); observable1=Observable.empty(); observable1=Observable.empty(); observable1=Observable.empty(); OnCompleted () <-- error() --> onCompleted () <-- error() --> Send only Error events, Observable2 =Observable.error(new RuntimeException()) // onError () <-- never() Observable3 =Observable.never(); observable3=Observable.never(); // Does the observer receive and call nothingCopy the code

5. Defer (get the latest data)

  • An Observable is dynamically created and sends events until an Observer subscribes
  • Create an Observable using the Observable Factory method
  • Every time you subscribe, you get a newly created Observable, which ensures that the data in the Observable is up to date

Application scenario: Dynamically create an Observable and get the latest Observable data

//5.defer Integer i = 10; Public void deferDo() {Observable<Integer> Observable = observables.defer (new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); }}); i = 15; Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Result: Get the latest assigned number, indicating that the latest data was obtained

6. Timer

  • Quickly create an Observable
  • Send event features: after a specified delay, send a value 0 (type Long)
  • Essence = Call onNext(0) once after a specified time delay

Application scenario: Delay the specified event, send a 0, generally used for detection

//6. Timer public void timerDo() {// Note: the timer operator runs on a new thread by default. Timer (long,TimeUnit,Scheduler) // timeunit. SECONDS Timer (long delay, TimeUnit unit) * Delay value * unit unit * So 2 seconds * */ Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {/* * if the result is 0, it is generally used to detect * */ log. v("lanjiabinRx", "accepted event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Results:

7. Interval (sent periodically, unlimited)

  • Quickly create an Observable
  • Features of sending events: Events are sent at specified intervals
  • The sequence of events sent = a sequence of integers starting at 0 and infinitely increasing by 1
Interval public void intervalDo() {/** * increments from 0 ** @param initialDelay (Long) * initialDelay (first delay) * @param period (Long) * time interval between subsequent digital emission (one cycle time) * @param unit * time unit * */ Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {log. v("lanjiabinRx", "Receiving the event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Results:

8. IntervalRange (periodic transmission, limited, specified data)

  • It works like interval (), but you can specify how much data to send
//8. IntervalRange public void intervalRangeDo() {/** ** @param start Start value * @param count Total number of values to send. The operator will issue onComplete * @param initialDelay the initialDelay before issuing the first value (start) * @param period the time period between subsequent values * @param unit unit of time * */ Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {log. v("lanjiabinRx", "Receiving the event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Results: 3-12 experience 10 numbers

9. Range (no delay, Integer type specifies data)

  • The function is similar to intervalRange (), but the difference is that there is no delay in sending events

    Range public void rangeDo(){/** * @param start * @param count * The number of sequential integers to be generated ** / Observable.range(3,5). Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

    Results:

10. RangeLong (no delay, Long type specified data)

//10. RangeLong public void rangeLongDo(){** * @param start * Long * @param count * Long The number of sequential integers to generate * */ observable.rangelong (3,8). Subscribe(new Observer<Long>() {@override public void onSubscribe(Disposable) D) {log. v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {log. v("lanjiabinRx", "Receiving the event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code

Results:

How many frustrations do we encounter in programming? Where desert ends, green grass grows.