preface

In my previous article, I introduced a new state management approach called BLoC, which does a great job of separating our UI logic from our business logic. But at the end we found a problem.

Stream is great, but not powerful enough

So today I would like to briefly introduce the DART implementation of ReactiveX — RxDart, which greatly expands the functions of Stream and makes it easier to use Bloc.

Before we begin, I hope you have read and understood the stream, and the rest of the article is based on that. Dart: What is a Stream if you haven’t already read dart: What is a Stream.

RxDart

What is ReactiveX

ReactiveX is a powerful library for writing asynchronous and event-based programs using observable sequences. It cuts through language and platform constraints and makes writing asynchronous programs as easy as walking in your garden. I’m sure you’ll love it!

The basic concept

In Dart: What is a Stream, I used a model to understand what happens inside a Stream. Today we’re going to use this model again to see what it looks like in RXDart.

But in this case, when we do intermediate processing, we are completely asynchronous, meaning that we cannot return a value immediately. We don’t know when the Stream will spit out the result, so an observer must guard the exit.

When an event/data flows out, the observer captures the event and parses it.

  • Subject implements and extends StreamController, which complies with all StreamController specifications. If you were using StreamController before, you could simply replace it with Subject. You can think of it as a streamController.
  • Observable implements and extends Stream. It combines commonly used Stream and streamTransformer into a very useful API. You can think of it as a stream.

Observable — Observable

Create Observavle

You can wrap a Stream directly as an Observable

  var obs = Observable(Stream.fromIterable([1.2.3.4.5]));
  
  obs.listen(print);
Copy the code

Output: 1, 2, 3, 4, 5

Create fromFuture: fromFuture

 var obs = Observable.fromFuture(new Future.value("Hello"));
 
  obs.listen(print); 
Copy the code

Output: Hello!

Create fromIterable fromIterable

var obs = Observable.fromInterable([1.2.3.4.5]);

obs.listen(print);
Copy the code

Output: 1, 2, 3, 4, 5

Let the flow spit out at an interval: interval

The interval method allows a stream to “spit out data” after a certain amount of time before spitting out the next data.

  var obs = Observable(Stream.fromIterable([1.2.3.4.5]))
    .interval(new Duration(seconds: 1));

  obs.listen(print);
Copy the code

Output: 1… 2… 3… 4… 5

Among them… The representative paused for a second.

Iteratively process data: map

The map method allows us to iteratively process each piece of data and return a new piece of data

 var obs = Observable(Stream.fromIterable([1.2.3.4.5]))
    .map((item)=>++item);
    
obs.listen(print);
Copy the code

Output: 2 3 4 5 6

Expand flow: expand

The expand method allows us to expand each item by as many streams

 var obs = Observable(Stream.fromIterable([1.2.3.4.5]))
   .expand((item)=> [item,item.toDouble()]);

 obs.listen(print);
Copy the code

Output: 1 1.0 2 2.0 3 3.0 4 4.0 5 5.0

Here we’re expanding each data to [item, item.todouble]. You can expand to any set of streams. If this is a broadcast Observable that gets listened to multiple times, it can call expand and expand itself individually.

Merge flows: merge

The merge method allows us to merge multiple streams. Note the output.

  var obs = Observable.merge([
    Stream.fromIterable([1.2.3]),
    Stream.fromIterable([4.5.6]),
    Stream.fromIterable([7.8.9]),]); obs.listen(print);
Copy the code

Output: 1 4 7 2 5 8 3 6 9

Execute multiple streams sequentially: concat

The concat method allows us to execute a set of streams sequentially, and when one set of streams is complete, the next set can be executed.

  var obs = Observable.concat([
    Stream.fromIterable([1.2.3]),
    Stream.fromIterable([4.5.6]),
    Stream.fromIterable([7.8.9]),]); obs.listen(print);
Copy the code

Output: 1 2 3 4 5 6 7 8 9

Check every item: every

Every checks each item for compliance, and returns an AsObservableFuture< bool> that can be converted to Observable.

  var obs = Observable.fromIterable([1.2.3.4.5]);

  obs.every((x)=> x < 10).asObservable().listen(print);
Copy the code

Output: true

Here’s what you need to know about Observables

  • Observables in Dart is a single subscription by default. If you try to listen to it twice, a StateError is thrown. You can use the factory method or asBroadcastStream to convert it to multiple subscription streams.
  var obs = Observable(Stream.fromIterable([1.2.3.4.5])).asBroadcastStream();
Copy the code
  • Many methods return values that are not Single or Observable but must return a Dart Future. Fortunately, you can easily find ways to convert them back to a stream.
  • Stream in Dart does not close by default when an error occurs. But in Rxdart, Error causes an Observable to terminate unless it is intercepted by an operator.
  • Streams in Dart are asynchronous by default, while Observables are synchronous by default.
  • The onListen method is called only the first time when dealing with multiple observables. And each subscriber does not interfere with each other.
  var obs = Observable(Stream.fromIterable([1.2.3.4.5])).asBroadcastStream();

// The first subscriber
  obs.interval(Duration(seconds: 1)).map((item) => ++item).listen(print);
// Second subscriber
  obs.listen(print);
Copy the code

Output: 1 2 3 4 5 2 3 4 5 6

The above are some common observables, which are not complete. I will continue to update this article and fully introduce its functions in the future

Enhanced version StreamController – Subject

Ordinary Broadcast Stream Controller: PublishSubject

StreamController PublishSubject is an ordinary radio version, you can listen to many times, the default is the sync is false, that is to say there is an asynchronous broadcasting AsyncBroadcastStreamController flow.

A broadcast flow controller that caches the latest event: BehaviorSubject

The BehaviorSubject is also a broadcast flow, but it records the latest event and sends the event as the first frame to the listener when the new listener listens.

Remember that little question at the beginning of our article? When we re-listen to page B, we can’t get the latest events, so we have to wait until we refire the stream to get the correct values.

Ok, we now replace our StreamCroller with a BehaviorSubject

//var _countController = StreamController.broadcast<int>();

var _subject = BehaviorSubject<int> ();Copy the code

It’s really that simple, seamless replacement 😆

The code has been uploaded to Github, let’s take a look at the effect

Case 1

  final subject = new BehaviorSubject<int> (); subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 3
  subject.stream.listen(print); // prints 3
  subject.stream.listen(print);
Copy the code

Output: 3, 3, 3

Since we started listening after Add (3), we will receive the latest value.

Case 2

  final subject = new BehaviorSubject<int>(seedValue: 1);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
Copy the code

Output: 1 1 1

SeedValue is the initial value and will be sent to the listener as the last value when there is a listener.

Broadcast stream controller that caches more events: ReplaySubject

The ReplaySubject can cache more values, and by default will cache all values and send the recorded event as the first frame on a new listen.

  final subject = ReplaySubject<int> (); subject.add(1);
  subject.add(2);
  subject.add(3);
  
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
Copy the code

Output: 1 1 1 2 2 2 3 3 3

You can also control the number of caches through maxSize

  final subject = ReplaySubject<int>(maxSize: 2);

  subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);
Copy the code

Output: 2, 2, 3, 3, 3

Customize your Subject

You can get more personalized functionality by customizing a new Subject to the Subject class. I’m not going to do chestnuts here. 😝

The release of the Subject

Be sure to release Subject when you no longer listen to it, or when Subject is no longer used. You can call subscription’s cancel() method to cancel a listener, or subject.close () to close the entire stream.

To learn more

Here are some excellent articles for you to refer to more

  • RxDart: Magical transformations of Streams
  • The ramen translation above
  • Reactive X website

Write in the last

This is the end of the RxDart article. It only introduces some of RxDart’s features. I will gradually improve it and finally complete it.

RxDart is powerful enough to make you feel comfortable handling a large number of asynchronous events. I’m sure every developer will love this great library once they get to know it.

If you have any good ideas or queries when using RxDART, please leave them in the comments section below and my email [email protected], and I will contact you within 24 hours!

The next article will be a summary of flutter state management, so stay tuned.