Sources of zero,
Source: Carson_Ho- Jane Books
First, basic knowledge
role | role | analogy |
---|---|---|
Observable | Generate events | The customer |
An Observer | Receives the event and gives the response action | The kitchen |
Subscribe | Connect the observed with the observer | The waiter |
Event | The carrier of communication between the observed and the observer | dishes |
Two, basic use
1. Import the connection
Implementation 'IO. Reactivex. Rxjava2: rxjava: 2.2.19' implementation 'IO. Reactivex. Rxjava2: rxandroid: 2.1.1'Copy the code
2. Create the observed
// Create the observed, Public Observable<Integer> createObservable() {Observable<Integer> Observable = Observable. Create (new) public Observable<Integer> createObservable() {Observable<Integer> Observable = Observable ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); }}); return observable; }Copy the code
3. Create an observer
Public Observer<Integer> createObserver() {Observer<Integer> Observer = new Observer<Integer>() {@override Public void onSubscribe(Disposable d) {log. v("lanjiabinRx", "onSubscribe connection "); } @override public void onNext(Integer value) {log. v("lanjiabinRx", "onNext "+ value +" event "); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "onError event "); } @Override public void onComplete() {log.v ("lanjiabinRx", "onComplete event "); }}; return observer; }Copy the code
4. Establish a SUBSCRIBE () connection
Public void createSubscribe() {createObservable().subscribe(createObserver()); }Copy the code
5. Calls and results
createSubscribe();Copy the code
6. Chain calls
Public void chainCall() {ObservableOnSubscribe (new ObservableOnSubscribe<Integer>() {@override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "OnSubscribe connection"); } @override public void onNext(Integer value) {log. v("lanjiabinRx", "onNext "+ value +" event "); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "onError event "); } @Override public void onComplete() {log.v ("lanjiabinRx", "onComplete event "); }}); }Copy the code
6. Disconnect the connection
That is, the observer cannot continue to receive events from the observed, but the observed can continue to send events
Disposable mDisposable; @override public void subscribe (Disposable d) {mDisposable= Disposable; V ("lanjiabinRx", "onSubscribe join "); //2. } @Override public void onNext(Integer value) { if (value==2) mDisposable.dispose(); //3. Disconnects log. v("lanjiabinRx", "onNext "+ value +" event ") on the second next event; }Copy the code
Create operator
General layout 0.
1. Create (Base send)
The most basic creation
//1.create public void chainCall() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "OnSubscribe connection"); } @override public void onNext(Integer value) {log. v("lanjiabinRx", "onNext "+ value +" event "); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "onError event "); } @Override public void onComplete() {log.v ("lanjiabinRx", "onComplete event "); }}); }Copy the code
2. Send just below 10 immediately
- Quickly create an Observable
- Send event features: Directly send incoming events
- A maximum of ten parameters can be sent
- Application scenario: Quickly create an Observable and send up to 10 events
//2.just public void justDo() { Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d("lanjiabinRx", "Subscribe "); } @override public void onNext(Integer Integer) {log. d("lanjiabinRx", "Received event onNext =" + Integer); } @override public void onError(Throwable e) {log.d ("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.d ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results:
3. FromArray (Array send)
- Quickly create an Observable
- Send event features: directly send the incoming array data
- Converts the data in the array into an Observable
Application scenarios: 1. Quickly create an Observable and send more than 10 events (in array form) 2. Array element traversal
//3.fromArray public void fromArrayDo() { Integer[] items = {0, 1, 2, 3, 4}; Observable.fromArray(items).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results:
4. FromIterable (Collection send)
- Quickly create an Observable
- Send event features: Send the incoming collection List data directly
- Converts the data in the array into an Observable
Application scenarios: 1. Quickly create an Observable and send more than 10 events (in the form of a collection) 2. Collection element traversal
//4.fromIterable public void fromIterableDo(){ List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results:
Observable1 =Observable.empty() observable1=Observable.empty(); observable1=Observable.empty(); observable1=Observable.empty(); observable1=Observable.empty(); OnCompleted () <-- error() --> onCompleted () <-- error() --> Send only Error events, Observable2 =Observable.error(new RuntimeException()) // onError () <-- never() Observable3 =Observable.never(); observable3=Observable.never(); // Does the observer receive and call nothingCopy the code
5. Defer (get the latest data)
- An Observable is dynamically created and sends events until an Observer subscribes
- Create an Observable using the Observable Factory method
- Every time you subscribe, you get a newly created Observable, which ensures that the data in the Observable is up to date
Application scenario: Dynamically create an Observable and get the latest Observable data
//5.defer Integer i = 10; Public void deferDo() {Observable<Integer> Observable = observables.defer (new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); }}); i = 15; Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Result: Get the latest assigned number, indicating that the latest data was obtained
6. Timer
- Quickly create an Observable
- Send event features: after a specified delay, send a value 0 (type Long)
- Essence = Call onNext(0) once after a specified time delay
Application scenario: Delay the specified event, send a 0, generally used for detection
//6. Timer public void timerDo() {// Note: the timer operator runs on a new thread by default. Timer (long,TimeUnit,Scheduler) // timeunit. SECONDS Timer (long delay, TimeUnit unit) * Delay value * unit unit * So 2 seconds * */ Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {/* * if the result is 0, it is generally used to detect * */ log. v("lanjiabinRx", "accepted event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results:
7. Interval (sent periodically, unlimited)
- Quickly create an Observable
- Features of sending events: Events are sent at specified intervals
- The sequence of events sent = a sequence of integers starting at 0 and infinitely increasing by 1
Interval public void intervalDo() {/** * increments from 0 ** @param initialDelay (Long) * initialDelay (first delay) * @param period (Long) * time interval between subsequent digital emission (one cycle time) * @param unit * time unit * */ Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {log. v("lanjiabinRx", "Receiving the event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results:
8. IntervalRange (periodic transmission, limited, specified data)
- It works like interval (), but you can specify how much data to send
//8. IntervalRange public void intervalRangeDo() {/** ** @param start Start value * @param count Total number of values to send. The operator will issue onComplete * @param initialDelay the initialDelay before issuing the first value (start) * @param period the time period between subsequent values * @param unit unit of time * */ Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {log. v("lanjiabinRx", "Receiving the event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results: 3-12 experience 10 numbers
9. Range (no delay, Integer type specifies data)
-
The function is similar to intervalRange (), but the difference is that there is no delay in sending events
Range public void rangeDo(){/** * @param start * @param count * The number of sequential integers to be generated ** / Observable.range(3,5). Subscribe(new Observer<Integer>() {@override public void onSubscribe(Disposable) { Log.v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Integer Integer) {log. v("lanjiabinRx", "Received event onNext =" + Integer); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results:
10. RangeLong (no delay, Long type specified data)
//10. RangeLong public void rangeLongDo(){** * @param start * Long * @param count * Long The number of sequential integers to generate * */ observable.rangelong (3,8). Subscribe(new Observer<Long>() {@override public void onSubscribe(Disposable) D) {log. v("lanjiabinRx", "Starts with a SUBSCRIBE join "); } @override public void onNext(Long aLong) {log. v("lanjiabinRx", "Receiving the event onNext =" + aLong); } @Override public void onError(Throwable e) {log. v("lanjiabinRx", "Receiving events onError =" + LLDB etMessage()); } @Override public void onComplete() {log.v ("lanjiabinRx", "Receiving the event onComplete"); }}); }Copy the code
Results:
How many frustrations do we encounter in programming? Where desert ends, green grass grows.