By: no dishes studio – Marklux
Source: Marklux’s Pub
The copyright belongs to the author, please indicate the source
Introduction to Responsive Programming (RxJava)
background
With the development of the time, the programming domain constantly introducing new technology to try to solve the existing problems, * * reactive programming (streaming programming) * * it is very popular in recent years a solution, if we focus on industry trends, you will find most of the languages and frameworks are beginning to support the programming model:
- Java 8 => introduces Stream, Observable, and Observer modes
- Spring 5 => Introduces WebFlux and fully adopts the responsive model at the bottom
- RxJava was github’s most popular project for a long time last year
It can be predicted that responsive programming will be applied in the development field on a large scale in the future, and Ali has also started relevant transformation. At present, taobao application architecture has embarked on the path of streaming architecture upgrade. As a development, it is necessary to master responsive programming paradigm. The following will give a brief introduction of related concepts and basic programming ideas based on RxJava 2.0 (based on the summary of the first three chapters of the book “Learning RxJava”)
The basic idea
The definition of Reactive Programming (RP) is controversial. Wikipedia defines it as a programming paradigm, ReactiveX defines it as an enhancement of design patterns (observer patterns), and some people think RP is just an assembly of existing wheels… We can discuss the nature of RP briefly at the end of this article, but from a learning perspective, I think the best way to think of RP is as an event – and stream-oriented programming idea, as follows:
Java advocates OOP programming ideas, so for Java programmers, the program is a combination of various objects, programming is a process to control the state and behavior of objects.
RP advocates the idea of flow-oriented programming, so that for developers, events and data are all presented as streams, sometimes in parallel, sometimes in intersection, and programming is a process of observing and regulating those streams.
Start in the real world
One of the most obvious physical phenomena in the real world is that everything moves, whether it’s traffic, weather, people, even a rock, depending on the earth’s rotation. The movement of different objects may not interfere with each other, such as vehicles and pedestrians moving on different roads, or there may be crossover, such as pedestrians and vehicles meeting at the same intersection.
Return to our programming, we tend to abstract into multiple processes, as well as in the real world, these processes are also change and movement, sometimes between them can be parallel, sometimes dependent (preconditions, postconditions), traditional programming model, we tend to adopt multithreading or asynchronous way to deal with these processes, but this way is not natural.
RP therefore abstracts and samples from the real world and divides programs into the following three components:
- Observable: Something that can be observed, events and data flows
- An Observer of the flow
- Operator: Operator, something that does joins and filters on streams and so on
Let’s use a simple example to introduce these three concepts:
Observable<String> myStrings =
Observable.just("Alpha"."Beta"."Gamma"."Delta"."Epsilon");
myStrings.map(s -> s.length())
.subscribe(s -> System.out.println(s));
Copy the code
In the example above, myStrings is an Observable, map(s -> s.length()) is an Operator, and subscribe(s -> system.out.println (s)) is an Observer.
This example will take the length of several strings and print them one by one.
Observable
An Observable is simply a stream, and there are only three core apis in RxJava:
onNext()
: Passes an objectonComplete()
: A completed “signal”onError()
: Pass an error
Basically all streams are a wrapper around these three methods.
create
-
Using observables. The create ()
Observable<String> myStrings = Observable.create(emitter -> { emitter.onNext("apple"); emitter.onNext("bear"); emitter.onNext("change"); emitter.onComplete(); }); Copy the code
-
The use of observables. Just ()
Observable<String> myStrings = Observable.just("Alpha"."Beta"."Gamma"."Delta"."Epsilon"); Copy the code
Note that the number of elements created by this method must be limited
-
Create from other data sources, such as Observable.fromiterable (), Observable.range()
Hot & Cold Observables
Cold streams produce static data, like a CD, that can be heard whenever anyone listens.
Observable<String> source =
Observable.just("Alpha"."Beta"."Gamma"."Delta"."Epsilon");
//first observer
source.subscribe(s -> System.out.println("Observer 1 Received: " + s));
//second observer
source.subscribe(s -> System.out.println("Observer 2 Received: " + s));
Copy the code
The above two SUBSCRIBE registered observers will get exactly the same stream
Data generated by Hot streams is dynamic, just like a radio station. If it misses the broadcast time slot, the past data cannot be retrieved. The Listener is required to create a Hot stream directly. A JavaFx example is provided, but it is not suitable for this purpose.
In fact, it’s common to convert a Cold stream into a Hot stream with a ConnectableObservable:
ConnectableObservable<String> source =
Observable.just("Alpha"."Beta"."Gamma"."Delta"."Epsilon")
.publish();
//Set up observer 1
source.subscribe(s -> System.out.println("Observer 1: " + s));
//Set up observer 2
source.map(String::length)
.subscribe(i -> System.out.println("Observer 2: " + i));
//Fire!
source.connect();
Copy the code
Publish () creates a ConnectableObservable, and connect() initiates the flow, in which case the Source passes all data to both observers. If you then register a new Observer for the Source, you will get no data because Hot streams do not allow data to be re-consumed.
Observers
The observer itself is a relatively simple structure, with the main functions implemented by the caller.
It is possible to create an Observer by implementing the Observer interface, although a more common case is to create an Observer through a lambda expression, as in the previous example, which is not detailed here.
The way to register is by calling the Subscribe method of Observerbale.
Operators
Just creating streams and observers doesn’t make much sense, and most of the time we need to do various operations on streams with operators to make RP meaningful.
RxJava provides a very rich set of operators, which can be broadly divided into the following categories:
- The Create operator is used to Create streams, several of which we’ve already used, such as Create, Just, Range, and Interval
- Transform operators used to change a flow into another form, such as Buffer (which converts elements of the flow into a set), Map (which performs a function on each element in the flow), Window (which splits elements of the flow into different Windows and emits them)
- Filter operator that filters out part of the data in the stream to get the specified data elements, such as Filter, First, and Distinct
- Combine operators that Merge multiple streams into a single stream, such as And, Then, Merge, Zip
- Conditional/arithmetic/aggregate/conversion operators… Play a variety of computing auxiliary role
- Custom operators, created by the user
If we could expand each of these operators, we could almost write a book, and you can see the wealth of functionality that operators provide. Just a few operators related to back pressure are explored below.
Back pressure
The so-called back pressure refers to the strategy (also called backpressure) by which the consumers control the producer speed when the producer speed is higher than the consumer speed in an asynchronous environment. This is a flow control method, which can use resources more efficiently and prevent the avalanche of errors.
To implement the back pressure strategy, you can use the following operators
-
Throttling orifice class
The speed at which an Observable emits messages is modulated by operators, such as using sample() to periodically sample and emit the last data, or throttleWithTimeout() to discard data that has timed out. But doing so would throw away some of the data, leaving the consumer with no complete message.
-
Buffer & Window buffers and Window classes
Use buffers () and Windows () to temporarily store messages that are being sent too fast and release them when they slow down. This applies to scenarios where the Sending rate of observables is not uniform.
In addition to using operators, a way to achieve back pressure is Reactive pull.
By calling the request(n) method in the Observer, the production of the producer can be countercontrolled by the consumer. In other words, the producer only generates data when the consumer requests it, and then requests new data when the consumer has consumed the data. In this way, the production speed will not be too fast, as shown in the following figure
This requires the upstream Observable to respond to the request. In this case, several operators can be used to control Observable behavior:
-
onBackPressureBuffer
Make a cache for the data sent by the Observable. The generated data is stored in the cache first. Whenever a request comes in, the number of events is retrieved from the cache and returned.
-
onBackPressureDrop
The Observable command discards the later time until Subscriber calls the request(n) method again, and sends the n time after the call time.
Backpressure strategy is a field worthy of further study and discussion. Backpressure based on consumer messages makes dynamic flow limiting and circuit breaking possible, and because of the perception of backpressure, applications have the opportunity to achieve dynamic shrinkage and expansion.
Consider: Why do WE need RP
Now that we’ve covered the basic concepts of RP, it’s time to think about why RP is needed, how RP can be used on the server, and the advantages it can bring.
First of all, about the nature of RP, I think it is a wheel of asynchronous programming. The API of Observer mode makes the process of asynchronous programming clearer and simpler, just like GO uses CSP to simplify concurrent operations. The underlying technology is actually the encapsulation of existing technology.
The question, then, is why encapsulate asynchronous programming, what benefits can be gained from using it, and to solve this problem we need to go back to the beginning.
If you ask the server what the biggest performance bottleneck is, the answer must be IO, because the most time-consuming part of processing a request is waiting for IO, and waiting causes blocking, so if you want to improve performance, you can’t write blocking code.
How do I keep my code from blocking? On the Java server side, there are two traditional approaches:
- Use Thread to run business code and IO code in separate threads. But you have to deal with concurrency issues (resource contention), and we know from our previous analysis of Java thread scheduling that this is not efficient on CPU resources (context switching is expensive).
- With asynchronous callbacks, you can use Callback or Future, but you need to implement the scheduling logic yourself, and the code written in Callback is not easy to understand. Callbcak Hell is possible.
So finally, to solve the performance bottleneck, the RP gives the following solution:
Provide an excellent asynchronous processing framework, while simplifying the process of writing asynchronous code, ultimately achieve the big goal of reducing blocking, improve performance.