-
Create: Creates an Observable from scratch using OnSubscribe. Note that when created using this method, it is recommended to check the subscription status in the OnSubscribe#call method to stop transmitting data or calculations in a timely manner.
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext("item1"); subscriber.onNext("item2"); subscriber.onCompleted(); }});Copy the code
-
fromObservable: Converts an Iterable, a Future, or an array internally into an Observable by proxy. The Future can be converted into
OnSubscribe
Is through theOnSubscribeToObservableFuture
The Iterable conversion passesOnSubscribeFromIterable
Carry on. Through an arrayOnSubscribeFromArray
Conversion.
List list=new ArrayList<>(); . Observable.from(list) .subscribe(new Action1() { @Override public void call(String s) { } }); Future futrue= Executors.newSingleThreadExecutor().submit(new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return "maplejaw"; }}); Observable.from(futrue) .subscribe(new Action1() { @Override public void call(String s) { } }); ;Copy the code
-
Just: Converts one or more objects into an Observable that emits this or these objects. If it is a single object, internal ScalarSynchronousObservable object is created. If multiple objects are created, the FROM method is called.
- Empty: Create an Observable that does nothing and does nothing directly
- Error: Create an Observable that does nothing and notifies errors directly
-
Never: Create an Observable that does nothing
Observable observable1=Observable.empty(); Observable observable2=Observable.error(new RuntimeException()); Observable observable3=Observable.never();Copy the code
-
Timer: Creates an Observable that emits data item 0 after the given delay and works internally via OnSubscribeTimerOnce
Observable.timer(1000,TimeUnit.MILLISECONDS) .subscribe(new Action1() { @Override public void call(Long aLong) { Log.d("JG",aLong.toString()); }});Copy the code
-
Interval: create a according to the given time interval of launching the sequence of integers from 0 observables, internal by OnSubscribeTimerPeriodically work.
Observable.interval(1, TimeUnit.SECONDS) .subscribe(new Action1() { @Override public void call(Long aLong) { } });Copy the code
-
Range: Creates an Observable that emits a sequence of integers in a specified range
Observable.range(2,5).subscribe(new Action1() { @Override public void call(Integer integer) { Log.d("JG",integer.toString()); } });Copy the code
-
Defer: An Observable is created only when the subscriber subscribes, and a new Observable is created for each subscription. Internally, call Func0 at subscribe time to create an Observable via OnSubscribeDefer.
Observable.defer(new Func0>() { @Override public Observable call() { return Observable.just("hello"); } }).subscribe(new Action1() { @Override public void call(String s) { Log.d("JG",s); }});Copy the code
-
Concat: Joins multiple Observables in sequence. It should be noted that Observable. Concat (a,b) is equivalent to a.concatwith (b).
Observables observable1 = observables. Just (1, 2, 3, 4); Observables observable2 = observables. Just (4 and 6); Observable.concat(observable1,observable2) .subscribe(item->Log.d("JG",item.toString()));Copy the code
-
StartWith: Adds an item of data to the beginning of a data sequence. Inside startWith, concat is also called
Observables. Just (1, 2, 3, 4, 5). StartWith (June). The subscribe (item - > the d (" JG, "item. The toString ()));Copy the code
-
merge: Merges multiple Observables into one. Unlike concat, merge does not join in order of addition, but in timeline. Among them
mergeDelayError
Exceptions are delayed until other error-free Observables are sent. whilemerge
It will stop transmitting data and send onError notification when encountering an exception.
-
Zip: Uses a function to combine data sets emitted by multiple Observables and then emit the result. If multiple Observables emit different amounts of data, the least Observable is used as the standard for compression. The interior is pressed by OperatorZip.
Observables observable1 = observables. Just (1, 2, 3, 4); Observables observable2 = observables. Just (4 and 6); Observable.zip(observable1, observable2, new Func2() { @Override public String call(Integer item1, Integer item2) { return item1+"and"+item2; } }) .subscribe(item->Log.d("JG",item));Copy the code
-
combineLatest:. When either Observables emits a data, combine the latest data emitted by each Observable (two Observables) with a specified function and emit the result of that function. It is similar to ZIP, but the difference is that ZIP works only when each Observable sends data, while combineLatest works when any Observable sends data, each time pressing with the latest data of another Observable. Please see the following flow chart for details.
Zip workflow
CombineLatest workflow
-
Filter: Filters data. Data is filtered internally through OnSubscribeFilter.
Observable. Just (3,4,5,6). Filter (new Func1() {@override public Boolean call(Integer Integer) {return Integer >4; } }) .subscribe(item->Log.d("JG",item.toString()));Copy the code
-
OfType: filters the specified type of data. Similar to filter,
Observable. Just (1,2,"3").oftype (integer.class).subscribe(item -> log.d ("JG", item.tostring ()));Copy the code
-
Take: send only the first N items of data or data within a certain period of time. Data is filtered internally by OperatorTake and OperatorTakeTimed.
,4,5,6 observables. Just (3) take (3) take (100, TimeUnit. MILLISECONDS)Copy the code
-
TakeLast: transmits only the last N items of data or data within a certain period of time. Data is filtered internally through OperatorTakeLast and OperatorTakeLastTimed. Takelastbuffers are similar to takeLast except that takelastBuffers are collected into a List and then fired.
Observable.just(3,4,5,6).takelast (3). Subscribe (integer -> log.d ("JG", integer.tostring ()));Copy the code
-
TakeFirst: Extracts the first item that satisfies the condition. Internal implementation source code is as follows:
public final Observable takeFirst(Func1 predicate) { return filter(predicate).take(1); }Copy the code
-
First /firstOrDefault: emits only the first item (or the first item satisfying a condition) of data. You can specify a default value.
Observable.just(3,4,5,6).first().subscribe(integer -> log.d ("JG", integer.tostring ())); Observable. Just (3,4,5,6). First (new Func1() {@override public Boolean call(Integer Integer) {return Integer >3; } }) .subscribe(integer -> Log.d("JG",integer.toString()));Copy the code
-
Last /lastOrDefault: sends only the last item (or the last item that satisfies a condition) of data. You can specify a default value.
-
Skip: Skip the first N items of data or data within a certain period of time. Internal filter through OperatorSkip and OperatorSkipTimed.
Observable.just(3,4,5,6).skip(1).subscribe(integer -> log.d ("JG", integer.tostring ()));Copy the code
-
SkipLast: skips the last N items of data or the data within a certain period of time. Internal filtering through OperatorSkipLast and OperatorSkipLastTimed.
- elementAt/elementAtOrDefault: emits an item of data, specifying a default value if the range is exceeded. Through internal
OperatorElementAt
Filtering.
,4,5,6 observables. Just (3). ElementAt (2) the subscribe (item - > the d (" JG, "item. The toString ()));Copy the code
-
IgnoreElements: discards all data and emits only error or normal termination notifications. The interior is implemented via OperatorIgnoreElements.
-
Distinct: Filters duplicate data, implemented by OperatorDistinct internally.
,4,5,6,3,3,4,9 observables. Just (3) distinct (). The subscribe (item - > the d (" JG, "item. The toString ()));Copy the code
-
DistinctUntilChanged: Filter out consecutive duplicate data. Internal through OperatorDistinctUntilChanged implementation
,4,5,6,3,3,4,9 observables. Just (3). DistinctUntilChanged (). The subscribe (item - > the d (" JG, "item. The toString ()));Copy the code
-
ThrottleFirst: the first data emitted by an Observable regularly. Internal implementation via OperatorThrottleFirst.
Observable.create(subscriber -> { subscriber.onNext(1); try { Thread.sleep(500); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(2); try { Thread.sleep(500); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(3); try { Thread.sleep(1000); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(4); subscriber.onNext(5); subscriber.onCompleted(); }).throttleFirst(999, TimeUnit.MILLISECONDS) .subscribe(item-> Log.d("JG",item.toString())); Copy the code
-
ThrottleWithTimeout/debounce: emission data, if the launch of two data interval less than a specified time, will discard the previous data, until the specified period of time did not launch just launched the new data
Observable.create(subscriber -> { subscriber.onNext(1); try { Thread.sleep(500); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(2); try { Thread.sleep(500); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(3); try { Thread.sleep(1000); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(4); subscriber.onNext(5); subscriber.onCompleted(); }).debounce(999, TimeUnit.MILLISECONDS) .subscribe(item-> Log.d("JG",item.toString())); Copy the code
-
Sample /throttleLast: periodically emits recent Observable data. The internal implementation is OperatorSampleWithTime.
Observable.create(subscriber -> { subscriber.onNext(1); try { Thread.sleep(500); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(2); try { Thread.sleep(500); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(3); try { Thread.sleep(1000); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(4); subscriber.onNext(5); subscriber.onCompleted(); }).sample(999, TimeUnit.MILLISECONDS) .subscribe(item-> Log.d("JG",item.toString())); Copy the code
-
Timeout: If the original Observable has not emitted any data for a specified period of time, it raises an exception or uses an alternate Observable.
Observable.create(( subscriber) -> { subscriber.onNext(1); try { Thread.sleep(1000); } catch (InterruptedException e) { throw Exceptions.propagate(e); } subscriber.onNext(2); subscriber.onCompleted(); }).timeout(999, TimeUnit MILLISECONDS, observables. Just (99100)). The subscribe (item - > Log.d("JG",item.toString()),error->Log.d("JG","onError")); }Copy the code
-
All: Determines whether all data items meet a certain condition. OperatorAll is used internally.
Observable.just(2,3,4,5) .all(new Func1() { @Override public Boolean call(Integer integer) { return integer>3; } }) .subscribe(new Action1() { @Override public void call(Boolean aBoolean) { Log.d("JG",aBoolean.toString()); } }) ;Copy the code
-
Exists: Determines whether a data item meets a condition. Internally via OperatorAny.
Observable. Just (2,3,4,5).exists(integer -> integer>3). Subscribe (aBoolean -> log.d ("JG",aBoolean.tostring ()));Copy the code
-
Contains: Determines whether all data items transmitted contain the specified data. The internal call actually exists
Observable. Just (2,3,4,5). Contains (3). Subscribe (aBoolean -> log.d ("JG",aBoolean.tostring ()));Copy the code
-
SequenceEqual: Determines whether two Observables emit the same data (data, emission order, termination status).
Observables. SequenceEqual (observables. Just 5-tetrafluorobenzoic (2), observables. Just 5-tetrafluorobenzoic (2)). The subscribe (aBoolean - > Log.d("JG",aBoolean.toString()));Copy the code
-
IsEmpty: used to determine whether the Observable emits data after launching. False if there is data, or true if only onComplete notification was received.
Observable. Just (3,4,5,6).subscribe(item -> log.d ("JG", item.tostring ()));Copy the code
-
Amb: Given multiple Observables, only let the first one emit data emit all data, and other observables will be ignored.
Observable observable1=Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { try { Thread.sleep(1000); } catch (InterruptedException e) { subscriber.onError(e); } subscriber.onNext(1); subscriber.onNext(2); subscriber.onCompleted(); } }).subscribeOn(Schedulers.computation()); Observable observable2=Observable.create(subscriber -> { subscriber.onNext(3); subscriber.onNext(4); subscriber.onCompleted(); }); Observable.amb(observable1,observable2) .subscribe(integer -> Log.d("JG",integer.toString())); Copy the code
-
SwitchIfEmpty: If no data is emitted after the original Observable terminates normally, the standby Observable is used.
Observable.empty().switchifempty (observable.just (2,3,4)).subscribe(o -> log.d ("JG", o.tostring ())));Copy the code
- DefaultIfEmpty: If the original Observable aborts normally and still doesn’t emit any data, it emits a default, internally called switchIfEmpty.
-
TakeUntil: The first Observable stops sending data when the transmitted data meets certain conditions (including the data) or the second Observable finishes sending.
Observable. Just (2,3,4,5). TakeUntil (new Func1() {@override public Boolean call(Integer Integer) {return Integer ==4; } }).subscribe(integer -> Log.d("JG",integer.toString()));Copy the code
-
TakeWhile: The Observable stops sending data when the emitted data meets a condition that does not include the data.
Observable. Just (2,3,4,5). TakeWhile (new Func1() {@override public Boolean call(Integer Integer) {return Integer ==4; } }) .subscribe(integer -> Log.d("JG",integer.toString()));Copy the code
- SkipUntil: discards data emitted by an Observable until the second Observable sends data. (Discard conditional data)
- SkipWhile: discards data emitted by an Observable until a specified condition is not true
-
Reduce: The reduce() function is used for the sequence and the final result is transmitted, internally implemented using OnSubscribeReduce.
Observable.just(2,3,4,5) .reduce(new Func2() { @Override public Integer call(Integer sum, Integer item) { return sum+item; } }) .subscribe(integer -> Log.d("JG",integer.toString()));Copy the code
-
Collect: Use Collect to collect data into a mutable data structure.
Observable. Just (3,4,5,6).collect(new Func0>() {@override public List call() {return new ArrayList(); } }, new Action2, Integer>() { @Override public void call(List integers, Integer integer) { integers.add(integer); } }) .subscribe(new Action1>() { @Override public void call(List integers) { } });Copy the code
-
Count /countLong: Count the number of launches. Internal call is reduce.
-
ToList: Collects all the data emitted by the original Observable into a list and returns the list.
Observable. Just (2,3,4,5).subscribe(new Action1>() {@override public void call(List integers) {}});Copy the code
-
ToSortedList: Collects all the data emitted by the original Observable into an ordered list and returns the list.
Observable. Just (6,2,3,4,5). ToSortedList (new Func2() {@override public Integer call(Integer Integer, Integer integer2) { return integer-integer2; } }) .subscribe(new Action1>() { @Override public void call(List integers) { Log.d("JG",integers.toString()); }});Copy the code
-
ToMap: Transforms sequence data into a Map. We can generate keys and values from data items.
Observable. Just (6,2,3,4,5). ToMap (new Func1() {@override public String call(Integer Integer) {return key: " + integer; }}, new Func1() {@override public String call(Integer Integer) {return "value: "+ Integer; } }).subscribe(new Action1>() { @Override public void call(Map stringStringMap) { Log.d("JG",stringStringMap.toString()); }});Copy the code
- ToMultiMap: Similar to toMap, except that the value of a map is a collection.
-
Map: Applies a function to transform every data emitted by an Observable.
Observables. Just (6, 2, 3, 4, 5). The map (integer - > "item:" + integer). The subscribe (s - > the d (JG, s));Copy the code
- Cast: Forces all data emitted by an Observable to the specified type before launching
-
FlatMap: Transform the data emitted by Observables into Observables collection, and then flatten the data emitted by these Observables into a single Observable. Merge the data internally.
Just (2,3,5). FlatMap (new Func1>() {@override public Observable call(Integer Integer) {return Observable.create(subscriber -> { subscriber.onNext(integer*10+""); subscriber.onNext(integer*100+""); subscriber.onCompleted(); }); } }) .subscribe(o -> Log.d("JG",o))Copy the code
-
FlatMapIterable: Just like a flatMap, but generates an Iterable instead of an Observable.
Observable. Just (2,3,5). FlatMapIterable (new Func1>() {@override public Iterable call(Integer Integer) {return Arrays.asList(integer*10+"",integer*100+""); } }).subscribe(new Action1() { @Override public void call(String s) { } });Copy the code
- ConcatMap: Similar to flatMap, concatMap is emitted sequentially due to internal concat merging.
-
SwitchMap: Similar to flatMap, it transforms the data emitted by an Observable into a collection of Observables. When the original Observable emits a new data (Observable), it unsubscribes to the previous Observable.
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { for(int i=1; i<4; i++){ subscriber.onNext(i); Utils.sleep(500,subscriber); } subscriber.onCompleted(); } }).subscribeOn(Schedulers.newThread()) .switchMap(new Func1>() { @Override public Observable call(Integer integer) { return Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(integer*10); Utils.sleep(500,subscriber); subscriber.onNext(integer*100); subscriber.onCompleted(); } }).subscribeOn(Schedulers.newThread()); } }) .subscribe(s -> Log.d("JG",s.toString()));Copy the code
-
Scan: Much like Reduce, it applies a function to each piece of data emitted by an Observable and emits each value in sequence.
Observable. Just (2,3,5). Scan (new Func2() {@override public Integer call(Integer sum, Integer item) {return sum+item; } }) .subscribe(integer -> Log.d("JG",integer.toString()))Copy the code
-
GroupBy: An Observable is divided into a collection of Observables, and the data emitted by original Observables are grouped by Key. Each Observable emits a group of different data.
Observable. Just (2,3,5,6). GroupBy (new Func1() {@override public String call(Integer Integer) {return Integer %2==0?" Even ":" odd "; } }) .subscribe(new Action1>() { @Override public void call(GroupedObservable o) { o.subscribe(new Action1() { @Override public void call(Integer integer) { Log.d("JG",o.getKey()+":"+integer.toString()); }}); }})Copy the code
-
Buffer: It periodically collects data from observables into a collection, which is then bundled and emitted, rather than one at a time
Observable. Just (2,3,5,6).buffer(3). Subscribe (new Action1>() {@override public void call(List integers) {}})Copy the code
-
Window: Periodically split Observable data into Observable Windows and launch them instead of one at a time.
Observable. Just (2,3,5,6).subscribe(new Action1>() {@override public void call(Observable integerObservable) { integerObservable.subscribe(new Action1() { @Override public void call(Integer integer) { } }); }})Copy the code
-
OnErrorResumeNext: The standby Observable is used when the original Observable encounters an error.
Observable. Just (1,"2",3).cast(integer.class).onerrorresumenext (Observable. Just (1,2,3)). Subscribe (Integer -> Log.d("JG",integer.toString())) ;Copy the code
- onExceptionResumeNextThe alternate Observable is used when the original Observable encounters an exception. with
onErrorResumeNext
Similar, but the difference isonErrorResumeNext
Can handle all errors, onExceptionResumeNext can only handle exceptions. -
OnErrorReturn: Emits a specific data when the original Observable encounters an error.
Observable.just(1,"2",3) .cast(Integer.class) .onErrorReturn(new Func1() { @Override public Integer call(Throwable throwable) { return 4; } }).subscribe(new Action1() { @Override public void call(Integer integer) { Log.d("JG",integer.toString());1,4 } });Copy the code
-
Retry: The original Observable retries when encountering an error.
Observable.just(1,"2",3) .cast(Integer.class) .retry(3) .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError")) ;Copy the code
-
RetryWhen: When the original Observable encounters an error, it passes the error to another Observable to decide whether to re-subscribe to the Observable. RetryWhen is called internally.
Observable.just(1,"2",3) .cast(Integer.class) .retryWhen(new Func1, Observable>() { @Override public Observable call(Observable observable) { return Observable.timer(1, TimeUnit.SECONDS); } }) .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError")); Copy the code
A ConnectableObservable is similar to a normal Observable, but a connected Observable doesn’t start emitting data when it’s subscribed, only when its connect() is called. In this way, you wait until all potential subscribers have subscribed to the Observable before launching data. ConnectableObservable. The connect () indicating a observables began firing data can be connected. Publish () converts an Observable to a connected Observable Observable. Replay () ensures that all subscribers see the same data sequence. Even if they subscribe after observables start emitting data. ConnectableObservable. RefCount () make a observables can be connected to act like an ordinary observables.
ConnectableObservable co = observables. Just (1, 2, 3). The publish (); co .subscribe(integer -> Log.d("JG",integer.toString()) ); co.connect();Copy the code
BlockingObservable is a BlockingObservable. A normal Observable converts to BlockingObservable using either the Observable.toblocking () or blockingobservable.from () methods. Internally, the blocking operation is implemented through CountDownLatch.
The following operators can be used with BlockingObservable. If you are a normal Observable, make sure you block Observable with observatory.toblocking (), otherwise you won’t get the desired results.
-
ForEach: calls a method on each data item emitted by BlockingObservable, which blocks until Observable completes.
Observables. Just (2, 3). ObserveOn (Schedulers. NewThread ()). ToBlocking (). The forEach (integer - > { Log.d("JG",integer.toString()+" "+Thread.currentThread().getName()); Utils.sleep(500); }); Log.d("JG",Thread.currentThread().getName());Copy the code
- First/firstOrDefault/last/lastOrDefault: these operators have introduced before. It can also be used for blocking operations.
- Single /singleOrDefault: Returns that value if Observable terminates emitting only one value, otherwise throws an exception or emits default values.
- MostRecent: returns an Iterable that always returns the mostRecent data emitted by an Observable.
- Next: Returns an Iterable that blocks until Observable emits a second value, and returns that value.
- Latest: returns an iterable. Blocks until or unless an Observable emits a value that the iterable does not return, and then returns it
- ToFuture: Converts an Observable to a Future
- ToIterable: Converts an Observable that emits a data sequence into an Iterable.
- GetIterator: Converts an Observable that emits a data sequence into an Iterator
-
Materialize: Transforms an Observable into a notification list.
Observable. Just (1,2,3).subscribe(new Action1>() {@override public void call(Notification Notification). { Log.d("JG",notification.getKind()+" "+notification.getValue()); }});Copy the code
- Dematerialize: Reverses the notification back to an Observable by doing the opposite.
-
Timestamp: adds a timestamp to each data item emitted by an Observable.
Observable. Just (1,2,3).timestamp().subscribe(new Action1>() {@override public void call(Timestamped Timestamped) { Log.d("JG",timestamped.getTimestampMillis()+" "+timestamped.getValue()); }});Copy the code
-
timeInterval: Adds a time difference between two data items emitted by an Observable
OperatorTimeInterval
In the
- Serialize: Forces observables to emit data in order and keep functionality intact
- Cache: Caches the data sequence emitted by an Observable and emits the same data sequence to subsequent subscribers
- ObserveOn: Dispatcher that specifies an observer to observe an Observable
- SubscribeOn: dispatcher that specifies that an Observable performs a task
-
DoOnEach: Registers an action for each data item emitted by an Observable
Observable. Just (2,3). DoOnEach (new Action1>() {@override public void call(Notification Notification) { Log.d("JG","--doOnEach--"+notification.toString()); } }) .subscribe(integer -> Log.d("JG",integer.toString()));Copy the code
-
DoOnCompleted: Registers an action to be used by the normally completed Observable
- DoOnError: Registers an action for the error Observable
-
DoOnTerminate: Registers an action to be used by the completed Observable, regardless of errors
Observable. Just (2,3). DoOnTerminate (new Action0() {@override public void call() {log.d ("JG","--doOnTerminate--"); } }) .subscribe(integer -> Log.d("JG",integer.toString()));Copy the code
- doOnSubscribe: Registers an action to be used when an observer subscribes. The internal
OperatorDoOnSubscribe
The implementation, - doOnUnsubscribe: Registers an action to be used when an observer unsubscribes. The internal
OperatorDoOnUnsubscribe
In implementation,call
To add a untying action.
-
FinallyDo/doAfterTerminate: register a action, completed in observables
Observable. Just (2,3). DoAfterTerminate (new Action0() {@override public void call() { Log.d("JG","--doAfterTerminate--"); } }) .subscribe(integer -> Log.d("JG",integer.toString()));Copy the code
-
Delay: the result of a delayed emitted Observable. That is, the original Observable pauses for a specified period of time before transmitting each data. The effect is that the data items emitted by an Observable move forward by an incremental in time (except for onError, which notifies it immediately).
-
delaySubscription: Delays the processing of subscription requests. Implementation in
OnSubscribeDelaySubscription
In the
-
Using: Creates a resource that exists only during the life of an Observable and is released automatically when the Observable terminates.
Observable.using(new Func0() { @Override public File call() { File file = new File(getCacheDir(), "a.txt"); if(! file.exists()){ try { Log.d("JG","--create--"); file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } return file; } }, new Func1>() { @Override public Observable call(File file) { return Observable.just(file.exists() ? "exist" : "no exist"); } }, new Action1() { @Override public void call(File file) { if(file! =null&&file.exists()){ Log.d("JG","--delete--"); file.delete(); } } }) .subscribe(s -> Log.d("JG",s)) ;Copy the code
- Single /singleOrDefault: Forces a single data to be returned, otherwise an exception or default data is thrown.
The operators for the RxJava standard library are covered, just as a memo. Please point out any mistakes.