preface
In my previous article, I introduced a new state management approach called BLoC, which does a great job of separating our UI logic from our business logic. But at the end we found a problem.
Stream is great, but not powerful enough
So today I would like to briefly introduce the DART implementation of ReactiveX — RxDart, which greatly expands the functions of Stream and makes it easier to use Bloc.
Before we begin, I hope you have read and understood the stream, and the rest of the article is based on that. Dart: What is a Stream if you haven’t already read dart: What is a Stream.
RxDart
What is ReactiveX
ReactiveX is a powerful library for writing asynchronous and event-based programs using observable sequences. It cuts through language and platform constraints and makes writing asynchronous programs as easy as walking in your garden. I’m sure you’ll love it!
The basic concept
In Dart: What is a Stream, I used a model to understand what happens inside a Stream. Today we’re going to use this model again to see what it looks like in RXDart.
But in this case, when we do intermediate processing, we are completely asynchronous, meaning that we cannot return a value immediately. We don’t know when the Stream will spit out the result, so an observer must guard the exit.
When an event/data flows out, the observer captures the event and parses it.
- Subject implements and extends StreamController, which complies with all StreamController specifications. If you were using StreamController before, you could simply replace it with Subject. You can think of it as a streamController.
- Observable implements and extends Stream. It combines commonly used Stream and streamTransformer into a very useful API. You can think of it as a stream.
Observable — Observable
Create Observavle
You can wrap a Stream directly as an Observable
var obs = Observable(Stream.fromIterable([1.2.3.4.5]));
obs.listen(print);
Copy the code
Output: 1, 2, 3, 4, 5
Create fromFuture: fromFuture
var obs = Observable.fromFuture(new Future.value("Hello"));
obs.listen(print);
Copy the code
Output: Hello!
Create fromIterable fromIterable
var obs = Observable.fromInterable([1.2.3.4.5]);
obs.listen(print);
Copy the code
Output: 1, 2, 3, 4, 5
Let the flow spit out at an interval: interval
The interval method allows a stream to “spit out data” after a certain amount of time before spitting out the next data.
var obs = Observable(Stream.fromIterable([1.2.3.4.5]))
.interval(new Duration(seconds: 1));
obs.listen(print);
Copy the code
Output: 1… 2… 3… 4… 5
Among them… The representative paused for a second.
Iteratively process data: map
The map method allows us to iteratively process each piece of data and return a new piece of data
var obs = Observable(Stream.fromIterable([1.2.3.4.5]))
.map((item)=>++item);
obs.listen(print);
Copy the code
Output: 2 3 4 5 6
Expand flow: expand
The expand method allows us to expand each item by as many streams
var obs = Observable(Stream.fromIterable([1.2.3.4.5]))
.expand((item)=> [item,item.toDouble()]);
obs.listen(print);
Copy the code
Output: 1 1.0 2 2.0 3 3.0 4 4.0 5 5.0
Here we’re expanding each data to [item, item.todouble]. You can expand to any set of streams. If this is a broadcast Observable that gets listened to multiple times, it can call expand and expand itself individually.
Merge flows: merge
The merge method allows us to merge multiple streams. Note the output.
var obs = Observable.merge([
Stream.fromIterable([1.2.3]),
Stream.fromIterable([4.5.6]),
Stream.fromIterable([7.8.9]),]); obs.listen(print);
Copy the code
Output: 1 4 7 2 5 8 3 6 9
Execute multiple streams sequentially: concat
The concat method allows us to execute a set of streams sequentially, and when one set of streams is complete, the next set can be executed.
var obs = Observable.concat([
Stream.fromIterable([1.2.3]),
Stream.fromIterable([4.5.6]),
Stream.fromIterable([7.8.9]),]); obs.listen(print);
Copy the code
Output: 1 2 3 4 5 6 7 8 9
Check every item: every
Every checks each item for compliance, and returns an AsObservableFuture< bool> that can be converted to Observable.
var obs = Observable.fromIterable([1.2.3.4.5]);
obs.every((x)=> x < 10).asObservable().listen(print);
Copy the code
Output: true
Here’s what you need to know about Observables
- Observables in Dart is a single subscription by default. If you try to listen to it twice, a StateError is thrown. You can use the factory method or asBroadcastStream to convert it to multiple subscription streams.
var obs = Observable(Stream.fromIterable([1.2.3.4.5])).asBroadcastStream();
Copy the code
- Many methods return values that are not Single or Observable but must return a Dart Future. Fortunately, you can easily find ways to convert them back to a stream.
- Stream in Dart does not close by default when an error occurs. But in Rxdart, Error causes an Observable to terminate unless it is intercepted by an operator.
- Streams in Dart are asynchronous by default, while Observables are synchronous by default.
- The onListen method is called only the first time when dealing with multiple observables. And each subscriber does not interfere with each other.
var obs = Observable(Stream.fromIterable([1.2.3.4.5])).asBroadcastStream();
// The first subscriber
obs.interval(Duration(seconds: 1)).map((item) => ++item).listen(print);
// Second subscriber
obs.listen(print);
Copy the code
Output: 1 2 3 4 5 2 3 4 5 6
The above are some common observables, which are not complete. I will continue to update this article and fully introduce its functions in the future
Enhanced version StreamController – Subject
Ordinary Broadcast Stream Controller: PublishSubject
StreamController PublishSubject is an ordinary radio version, you can listen to many times, the default is the sync is false, that is to say there is an asynchronous broadcasting AsyncBroadcastStreamController flow.
A broadcast flow controller that caches the latest event: BehaviorSubject
The BehaviorSubject is also a broadcast flow, but it records the latest event and sends the event as the first frame to the listener when the new listener listens.
Remember that little question at the beginning of our article? When we re-listen to page B, we can’t get the latest events, so we have to wait until we refire the stream to get the correct values.
Ok, we now replace our StreamCroller with a BehaviorSubject
//var _countController = StreamController.broadcast<int>();
var _subject = BehaviorSubject<int> ();Copy the code
It’s really that simple, seamless replacement 😆
The code has been uploaded to Github, let’s take a look at the effect
Case 1
final subject = new BehaviorSubject<int> (); subject.add(1);
subject.add(2);
subject.add(3);
subject.stream.listen(print); // prints 3
subject.stream.listen(print); // prints 3
subject.stream.listen(print);
Copy the code
Output: 3, 3, 3
Since we started listening after Add (3), we will receive the latest value.
Case 2
final subject = new BehaviorSubject<int>(seedValue: 1);
subject.stream.listen(print); // prints 1
subject.stream.listen(print); // prints 1
subject.stream.listen(print);
Copy the code
Output: 1 1 1
SeedValue is the initial value and will be sent to the listener as the last value when there is a listener.
Broadcast stream controller that caches more events: ReplaySubject
The ReplaySubject can cache more values, and by default will cache all values and send the recorded event as the first frame on a new listen.
final subject = ReplaySubject<int> (); subject.add(1);
subject.add(2);
subject.add(3);
subject.stream.listen(print); // prints 1
subject.stream.listen(print); // prints 1
subject.stream.listen(print);
Copy the code
Output: 1 1 1 2 2 2 3 3 3
You can also control the number of caches through maxSize
final subject = ReplaySubject<int>(maxSize: 2);
subject.add(1);
subject.add(2);
subject.add(3);
subject.stream.listen(print); // prints 1
subject.stream.listen(print); // prints 1
subject.stream.listen(print);
Copy the code
Output: 2, 2, 3, 3, 3
Customize your Subject
You can get more personalized functionality by customizing a new Subject to the Subject class. I’m not going to do chestnuts here. 😝
The release of the Subject
Be sure to release Subject when you no longer listen to it, or when Subject is no longer used. You can call subscription’s cancel() method to cancel a listener, or subject.close () to close the entire stream.
To learn more
Here are some excellent articles for you to refer to more
- RxDart: Magical transformations of Streams
- The ramen translation above
- Reactive X website
Write in the last
This is the end of the RxDart article. It only introduces some of RxDart’s features. I will gradually improve it and finally complete it.
RxDart is powerful enough to make you feel comfortable handling a large number of asynchronous events. I’m sure every developer will love this great library once they get to know it.
If you have any good ideas or queries when using RxDART, please leave them in the comments section below and my email [email protected], and I will contact you within 24 hours!
The next article will be a summary of flutter state management, so stay tuned.