The original address

Recently in Alloyteam Conf 2016, I shared “Building Streaming Front-end Applications with RxJS”. After the meeting, I communicated with everyone both online and offline, and found that there were two kinds of attitudes towards RxJS: those that were used expressed the elegant coding experience brought by RxJS, while those that were not used gave feedback that it was too difficult to get started. Therefore, I will combine my own understanding of RxJS and analyze it step by step through the implementation principle, basic implementation and examples of RxJS to provide comprehensive guidance for RxJS and experience what it is like to use RxJS coding.

directory

  • The search function is implemented in the normal way
  • RxJS flow Stream
  • RxJS implementation principle brief analysis
    • Observer model
    • Iterator pattern
    • RxJS observer + iterator mode
  • RxJS base implementation
    • Observable
    • Observer
  • RxJS Operators,
    • Introduction to Operators,
    • A series of Operators operations
  • Use RxJS step by step to achieve the search function
  • conclusion

Search is implemented in the normal way

To do a search function in the front-end development is not unfamiliar, the general way of implementation is: listen to the input event of the text box, send the input content to the background, and finally the data returned by the background is processed and displayed as the search results.

<input id="text"></input> <script> var text = document.querySelector('#text'); text.addEventListener('keyup', (e) =>{ var searchText = e.target.value; $. Ajax ({url: 'search.qq.com/${searchText}', success: data => {// Render (data); }}); }); </script>

The above code does what we want, but there are two big problems:

  1. Superfluous request When you want to search for “Edison”, there may be three types of input box, “love”, “Eddie”, “Edison”. In these three cases, there will be 3 requests and 2 redundant requests.

  2. The useless request was still carried out: first search “Edison”, then search “Darwin” immediately. The result background returns the search result for “Edison”. After performing the rendering logic, the result box shows the result for “Edison” instead of the current search for “Darwin”, which is incorrect.

To reduce the number of redundant requests, you can use the setTimeout function to throttle the way to handle, the core code is as follows

<input id="text"></input> <script> var text = document.querySelector('#text'), timer = null; Text.addeventlistener (' keyUp ', (e) =>{// Clear the previous timer clearTimeout(timer) if other inputs are made within 250 milliseconds; // Timer, triggered after 250 ms timer = setTimeout(() => {console.log(' initiate a request.. '); },250) }) </script>

Already useless request is still executed solution, you can declare a state variable of the current search before launching the request, the background will search the content and results returned together, the front end to judge whether the returned data is consistent with the current search, consistent to render logic. The final code is

<input id="text"></input> <script> var text = document.querySelector('#text'), timer = null, currentSearch = ''; text.addEventListener('keyup', (e) =>{clearTimeout(timer) timer = setTimeout(() =>{// A state variable currentSearch = 'book '; var searchText = e.target.value; $.ajax({ url: `search.qq.com/${searchText}`, success: If (data.search === currentSearch) {// Render render(data); } else { // .. }}}); },250) }) </script>

The code above is pretty good, but it’s starting to look messy. Let’s use RxJS to implement the above code function, as follows

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup')
                    .debounceTime(250)
                    .pluck('target', 'value')
                    .switchMap(url => Http.get(url))
                    .subscribe(data => render(data));

Obviously, based on RxJS implementation, the code is very simple!

RxJS flow Stream

RxJS, short for Reactive Extensions for JavaScript, originates from Reactive Extensions and is a library based on observable data streams for asynchronous programming applications. RxJS is an implementation of Reactive Extensions in JavaScript, and there are implementations of other languages such as RxJava, RxAndroid, and RxSwift. To learn about RxJS, we need to start with Streams, an important data type in Rx.

A flow is a series of events that occur over time. It has the concept of time and event response.

When it rains, raindrops develop over time, creating ripples on the surface of the water as they fall, much like flows in Rx. On the Web, a raindrop might be a series of mouse clicks, keyboard clicks, events, data sets, and so on.

RxJS basic implementation principle brief analysis

After a certain understanding of the concept of convection, we will talk about how RxJS is implemented around the concept of flow, and talk about the basic implementation principle of RxJS. RxJS is based on the observer pattern and iterator pattern with functional programming thinking.

Observer model

The observer pattern is most common on the Web for listening and firing DOM events.

  • Subscribe: Subscribe to the Document. body click event via addEventListener.
  • Publish: When the body node is clicked, the body node publishes the message to the subscriber.
document.body.addEventListener('click', function listener(e) { console.log(e); },false); document.body.click(); // simulate a user click

Abstract the above example and correspond to the generic observer model

Iterator pattern

The iterator pattern can be represented by JavaScript providing the Iterable Protocol. The Iterable Protocol is not a concrete variable type, but an implementable Protocol. JavaScript has built-in iterable types such as Array and Set. An iterator can be used to get an iterator object, and the next method will get an element object, as shown in the following example.

var iterable = [1, 2];

var iterator = iterable[Symbol.iterator]();

iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}

iterator.next(); // => { value: undefined, done: true}

In the element object: value indicates the return value, and done indicates whether the end has been reached.

Iterators can be traversed using the following approach.

var iterable = [1, 2]; var iterator = iterable[Symbol.iterator](); while(true) { let result; try { result = iterator.next(); // <= get the next value} catch (err) {handleError(err); // <= error handling} if (result.done) {handleCompleted(); // <= no more value (done) break; } doSomething(result.value); }

There are three main situations:

  • The next call to get the next value returns the elements one by one, enabling multiple returns.

  • No more values (done) When there are no more values, Next returns done as true in the element.

  • Error handling When the next method executes an error, an error event is thrown, so you can wrap the next method in a try catch to handle possible errors.

RxJS observer + iterator mode

RxJS contains two basic concepts: Observables and Observers. Observables, as observed, is a stream of values or events; The Observer acts as an Observer, according to the Observables. Observables and observers subscribe to publish (Observer mode) as follows:

  • Subscribe: The Observer subscribes to an Observable via the Subscribe () method provided by the Observable.
  • Publish: Observable publishes events to the Observer by calling back to the Next method.

Observable and Observer pseudocodes are shown below

// Observer var Observer = {next(value) {alert(' received ${value} '); }}; // Observable function Observable (Observer) { setTimeout(()=>{ Observer.next('A'); },1000) } // subscribe Observable(Observer);

The above is actually a representation of the observer pattern, so how is the iterator pattern represented in RxJS?

In RxJS, the Observer provides two additional methods, error() and complete(), that correspond to the iterator pattern, in addition to the next method to receive Observable events.

Var Observer = {next(value) {/* handle value */}, error(error) {/* Handle exception */}, complete() {/* Handle completed state */}};

Combine this with the Iterator:

  • The Next () Observer provides a next method to receive the Observable stream as a push; Iterator gets the value by calling iterator.next(), which is a pull form.

  • Complete () fires the Observer’s complete method when no new values are issued; In Iterator, the return from Next is complete when done is true.

  • Error () When an exception is reported during a processing event, the Observer provides an error method to receive the error for unified processing. Iterator needs a try catch wrap to handle possible errors.

The following is the pseudo-code for Observable and Observer to implement the Observer + iterator pattern. The gradual transmission and influence of data is actually the performance of the flow.

// Observer var Observer = {next(value) {alert(' received ${value} '); }, error(error) {alert(' received ${error} '); }, complete() { alert("complete"); }}; Observable function Observable (Observer) {[1,2,3]. Map (item=>{observer.next (item); }); Observer.complete(); // Observer.error("error message"); } // subscribe Observable(Observer);

RxJS base implementation

How do you create observables and observers in RxJS with these concepts and pseudo-code?

Create observables

RxJS provides the create method to create an Observable from the definition, which can emit streams using Next.

var Observable = Rx.Observable.create(observer => {
    observer.next(2);
    observer.complete();
    return  () => console.log('disposed');
});

Create the Observer

The Observer can declare next, Err, and complete methods to handle different states of the flow.

var Observer = Rx.Observer.create(
    x => console.log('Next:', x),
    err => console.log('Error:', err),
    () => console.log('Completed')
);

Finally, Observable and Observer are combined through subscribe.

var subscription = Observable.subscribe(Observer);

Streams can be unsubscribed in RxJS. Calling SUBSCRIBE returns a subscription, which can be unsubscribed by calling subscription.unsubscribe().

It looks complicated, doesn’t it? Change the implementation form:

Observables var streamA = rx.observables. Of (2); // @Observer streamA$.subscribe(Observer) streamA.subscribe(v => console.log(v));

The code is very succinct if we change the above code to chain:

Rx.Observable.of(2).subscribe(v => console.log(v));

RxJS · Operators operation

Get started

Rx.Observable.of(2).subscribe(v => console.log(v));

The above code essentially creates a stream (2) that eventually prints out 2. So if I want to double the print to 4, what do I do?

One way? Change the event source to Observable X 2

Rx.Observable.of(2 * 2 /* <= */).subscribe(v => console.log(v));

Scheme 2? : Change the response mode and let the Observer process X 2

Rx.Observable.of(2).subscribe(v => console.log(v * 2 /* <= */));

Elegant solution: RxJS provides elegant methods for adding a flow of operations between an Observable and an Observer.

Rx.Observable.of(2)
             .map(v => v * 2) /* <= */
             .subscribe(v => console.log(v));

The map operation is the same as the array operation, except that it changes the stream and then passes the new one out. In RxJS, this type of flow of operations is called Operators. RxJS provides a series of Operators like Map, Reduce, Filter, and so on. Operating streams create new streams to keep them immutable, which is a point of functional programming in RxJS. I’m not going to talk about functional programming here, but check out another article called “Talking about Functional Programming.”

So here we have the number of operations that a stream can go through from generation to final processing. Observable in RxJS goes through a series of Operators to arrive at the Observer.

          Operator1   Operator2
Observable ----|-----------|-------> Observer

A series of Operators operations

RxJS provides a number of operations, such as the following.

Aggregate,All,Amb,ambArray,ambWith,AssertEqual,averageFloat,averageInteger,averageLong,blocking,blockingFirst,blockingFo rEach,blockingSubscribe,Buffer,bufferWithCount,bufferWithTime,bufferWithTimeOrCount,byLine,cache,cacheWithInitialCapacit y,case,Cast,Catch,catchError,catchException,collect,concatWith,Connect,connect_forever,cons,Contains,doAction,doAfterTer minate,doOnComplete,doOnCompleted,doOnDispose,doOnEach,doOnError,doOnLifecycle,doOnNext,doOnRequest,dropUntil,dropWhile, ElementAt,ElementAtOrDefault,emptyObservable,fromNodeCallback,fromPromise,fromPublisher,fromRunnable,Generate,generateWi thAbsoluteTime,generateWithRelativeTime,Interval,intervalRange,into,latest (Rx.rb version of Switch),length,mapTo,mapWithIndex,Materialize,Max,MaxBy,mergeArray,mergeArrayDelayError,mergeWith,Min,MinBy,multicastWit hSelector,nest,Never,Next,Next (BlockingObservable version),partition,product,retryWhen,Return,returnElement,returnValue,runAsync,safeSubscribe,take_with_time,takeFirst,Ta keLast,takeLastBuffer,takeLastBufferWithTime,windowed,withFilter,withLatestFrom,zipIterable,zipWith,zipWithIndex

For details about each operation, see the official website. Operators have static (static) method and the example (instance) method, the use of Rx. Observables. Xx and Rx observables. Prototype. Xx to distinguish simple, a few examples.

Rx.observable. Of can convert ordinary data into streaming data Observables. Rx.observable. Of (2) above.

In addition to values, RxJS provides operations on events. FromEvent can be used to listen for events. When an event is triggered, the event is converted into a mobile Observable for transmission. Listen for the keyUP event of a textbox. Triggering the Keyup event generates a series of Event Observables.

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .subscribe(e => console.log(e));

Rx. Observables. Prototype. The map with the way we use the map method is the same, different only change here is to flow, and then the new spread out. This is covered in the example above, so I won’t go into more details here.

Rx.Observable.of(2)
             .map(v => 10 * v)
             .subscribe(v => console.log(v));

Rx provides a number of operations, and to better understand what each operation does, we can use the visualization tool Marbles diagram to help us understand it. For example, the marbles diagram corresponding to the map method is shown below

The arrow can be interpreted as a timeline, and the data on the top is transformed into the one below by the intermediate operation.

Rx. Observables. Prototype. MergeMap mergeMap is also commonly used in RxJS interface, we come to the combination of marbles figure (flatMap (alias) to understand it

In the above data flow, a new branch flow (flow within flow) is created, and mergeMap’s role is to adjust the branch flow back to the trunk. The final data flow on the branch goes through other operations on the trunk, essentially flattening the flow within the stream.

Rx. Observables. Prototype. SwitchMap switchMap and mergeMap are the flow channel to the trunk, branches, different places is switchMap will only retain the final flow, and cancel before abandoning flow.

In addition to marbles mentioned above, visual diagrams can also be drawn as ASCII characters, which will be understood by combining Map, mergeMap, and switchMap.

@ Map @ mergeMap @ switchMap ↗ ↗ ↗ ↗ - A -- -- -- -- -- - B - > a2 b2 a2 b2-2-2 B - > / / / / / / / / a1 b1 a1 b1 / / / / -A-B-----------> -A-B----------> --a1-b1-a2-b2--> --a1-b1---b2-->Copy the code

In mergeMap and switchMap, A and B are flows generated on the trunk, A1 and A2 are flows generated on the branch by A, b1 and B2 are flows generated on the branch by B, as can be seen, they will be merged on the trunk eventually. SwitchMap keeps only the last stream, so the A2 of A is discarded.

Rx. Observables. Prototype. DebounceTime debounceTime operation can operate on a timestamp TIMES, said after TIMES milliseconds, not into the new value, then the value into the next operation.

Operators in RxJS satisfy our previous development thinking, such as Map and Reduce. In addition, both the marbles diagram and the ASCII character diagram are very helpful for learning and understanding RxJS.

Implement the search example step by step using RxJS

RxJS provides a number of interfaces for creating a stream or manipulating a stream, and using these interfaces, let’s Rx our search example step by step.

Use the fromEvent interface provided by RxJS to listen for keyUp events in our input box. Triggering keyUp produces an Observable.

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .subscribe(e => console.log(e));

Here we don’t want to output events, we want to get text input values, request a search, and finally render the results. There are two new Operators involved:

  • Rx. Observables. Prototype. Pluck (‘ target ‘, ‘value’) the input event, output into the event). The target value.

  • Rx. Observables. Prototype. MergeMap () will be asked to search results on the output back to the Observer.

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .pluck('target', 'value') // <--
             .mergeMap(url => Http.get(url)) // <--
             .subscribe(data => render(data))

The above code implements simple search rendering, but it suffers from the same two problems I mentioned at the beginning. So how do you reduce the number of requests and cancel those that are no longer useful? Let’s look at the other Operators provided by RxJS to solve this problem.

  • Rx. Observables. Prototype. DebounceTime (TIMES) said after TIMES milliseconds, not into the new value, then the value into the next link. This is consistent with the previous method of using setTimeout to implement function throttling.

  • Rx. Observables. Prototype. SwitchMap use switchMap replace mergeMap (), will be able to cancel an already useless on request, only keep last request flow as a result, thus ensure the display is the last search result.

The final implementation is as follows, and when compared to the initial implementation, it is clear that RxJS makes the code very concise.

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .debounceTime(250) // <- throttling behaviour
             .pluck('target', 'value')
             .switchMap(url => Http.get(url)) // <- Kill the previous requests
             .subscribe(data => render(data))

conclusion

This article as RxJS introduction to end here, about the other aspects of RxJS content, and then carry out further analysis and study. As a library, RxJS can be used with many frameworks, but not every situation requires RxJS. Complex data sources, in the case of more asynchronous can better highlight the role of RxJS, this piece can be read by Uncle Worker “Flowing Data – Using RxJS to Construct complex single-page Application data Logic” I believe will have a better understanding.

Add: RxJS (JavaScript) github.com/Reactive-Ex… RxJS (TypeScript) github.com/ReactiveX/r…

See more articles >> github.com/joeyguo/blo…