preface
When I was writing articles or volumes related to RxJava, some readers hoped that I could write some articles related to RxJava practice. Although I agreed, I did not find a good opportunity, because Qiusbai.com was always in the maintenance of the old project, and there were few opportunities to deeply use RxJava at the business level. As for requesting a network interface with RxJava, it is a bit trivial, so the appropriate business code has never been found. Recently, I am working on a new project, which is related to live broadcasting. I have used RxJava in an important small business. This case is used to introduce the idea of using RxJava in production practice.
What to make of RxJava
Before I talk about how to use RxJava, I definitely want to talk about how to understand RxJava, because how you look at it determines how you use it. I have a very detailed explanation of how to understand and use RxJava in chapter 6 of my gold digging book, Responsive Programming — An Advanced Guide to RxJava. If you are interested, Support it by the way.
Of course, I’ll briefly mention that you need to think of RxJava as a programming paradigm for building object (business) relationships, not as an asynchronous programming library.
So what is the programming paradigm for building relationships? Let’s take APP initialization as an example. Suppose our APP needs to initialize the SDK and database, log in, and then jump to the next page. So what is the business relationship here?
The following is an initialized business, and their relationship is shown below
It should look like the figure above. The SDK, database initialization, and login interface are not dependent on each other after the page jumps. So we sort out the relationships between several businesses in this small requirement. So how does RxJava implement building relationships in code?
In the past, we might have written this business:
InitSDK (context) // service 1 initDatabase(context) // Service 2 Login (getUserId(context),new Callback{// Service 3 voidonSucc(){startActivity() // business 4}})Copy the code
The above code is a very common business initialization code, but in fact they are executed in strict sequence. This is not the real relationship between them. The real relationship should be to complete business 1,2, and then business 4.
So if you want to really build relationships between businesses, RxJava comes in handy.
// Initialize the SDK
Observable obserInitSDK = Observable.create((context)->{initSDK(context)}).subscribeOn(Schedulers.newThread())
// Initialize the database
Observable obserInitDB = Observable.create((context)->{initDatabase(context)}).subscribeOn(Schedulers.newThread())
// Complete the login
Observable obserLogin = Observable.create((context)->{login(getUserId(context))})
.map((isLogin)->{returnContext()})
.subscribeOn(Schedulers.newThread())
// Complete three tasks respectively
Observable observable = Observable.merge(obserInitSDK,obserInitDB,obserLogin)
// Complete the three operations separately and jump to the next one
observable.subscribe(()->{startActivity()})
Copy the code
Of course, most of the time, we don’t need to write code strictly according to the business relationship, because it’s not necessary, for example initSDK(Context), initDatabase(context) may not be time-consuming operations, we can put it in a thread to execute successively. But what if they all need a network request interface? It is obviously easier and more efficient to build business relationships through RxJava fetching at this point.
This example is a vivid example of relational construction paradigm programming. By building relationships, RxJava actually gives it the special ability to improve the performance of complex/time-consuming operations and to deconstruct complex logic. Conversely, by understanding RxJava in terms of building relationships, you’ll be comfortable with it, not restricted to asynchronous programming and missing the forest for the trees.
For more in-depth information on RxJava, check out my “Responsive Programming — Advanced Guide to RxJava.”
Business is done through RxJava
Here, just into the main topic, about RxJava a practice.
First, the background of the requirements: The message module of the live broadcast room realizes real-time sending and receiving of live broadcast messages (separate threads) through Websocket connection. The back end provides an interface to obtain the latest offline messages, which is used to obtain the live broadcast messages generated during offline reconnection (obviously another thread or main thread). At this time, the business requirements are as follows: When the Websoket is disconnected abnormally, the client needs to reconnect the websocket. After the connection is successful, the client receives new WebSocket messages, pulls the live broadcast messages generated during the offline period, and forwards the messages to the upper-layer module in chronological order (the offline messages come before the offline messages).
The following figure is a schematic diagram of the business logic I drew, which basically describes the overall structure of the business logic:
After looking at this requirement, let’s analyze the possible difficulties here;
- Websocket accepts new messages and the interface pulls offline messages in different child threads that need to be sequentially forwarded to the main thread
- After the WebSocket is reconnected, it starts to receive new messages. The other side pulls offline messages from the offline interface, and the offline messages always precede the new messages
If you don’t use RxJava, how would you normally implement it? What’s the complexity?
I didn’t think about it, because my primary concern was how to do this with RxJava, and here’s a principle: complex business logic should use RxJava first,
We are still thinking in terms of building relationships, but we need to think about the services that need to be implemented in RxJava. In this case, the disconnection and reconnection of webSockets, how websockets get messages are all wrapped by the SDK or triggered by the SDK’s own callback, and we should not force the implementation of RxJava. So really using RxJava there are about three business logic:
- Get the message from webSocket and forward it (temporarily called Business 1)
- Retrieves messages from the offline list interface and forwards them (temporarily called Business 2)
- Forward messages from both channels to the Activity display (temporarily called Business 3)
One might ask, why not include ordering as a business? It looks a lot like a separate business. Because the sequential order in this case is actually the relationship between business 1 and business 2, and since the diagram doesn’t describe the sequential order, I drew it this way. It looks like a single business, but it doesn’t really count.
Each of these three businesses is simple, and their relationship can be summed up as follows: the data for business 2 must be forwarded to business 3 before business 1
Private var onLineStream = replayprocessor.create <String>().toserialized () // Messages retrieved in websocket are forwarded directly to here fun senMessageFromOnLine(Message: String ="", isCompleted: Boolean = false) {
var stream = onLineStream
if (stream.hasComplete() || stream.hasThrowable()) {
return
}
if(! TextUtils.isEmpty(message)) { stream.onNext(message) }if(isCompleted) {stream.onComplete()}} // Business 2 // gets offline messages through the interface, if not, returns the empty list fungetOfflineMessageFlowable() {
return Flowable.create(object :FlowableOnSubscribe<String>{
override fun subscribe(emitter: FlowableEmitter<String>) {
emitter.onNext(getOfflineMessageFromServer())
emitter.onComplete()
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
}
Copy the code
Why Flowable instead of Observable? It makes sense to use Flowable because of message caching and possible backpressure (the most rXJava-friendly article about backpressure).
Why is the implementation class used for WebSocket messages Processor and not Flowable? This is because webSocket messages are retrieved externally, rather than data we create internally with RxJava, so using the Processor is more flexible.
Ok, now that we have defined the first two businesses, the third business is also very simple, the key is to connect the three businesses through relationship building? We have summarized the relationship between businesses before. From the perspective of RxJava, it is to receive the data of offline messages first, and then receive the data of WebSocket messages. We can easily find the concat operator by searching the relevant documents according to the requirements. So:
/** * After wesocket connection succeeds, call */ funconnected(){// Concat always guarantees to subscribe to the first flowable data and wait for the first flowable data to complete. / / to subscribe to pull the second Flowable data Flowable. Concat < String > (getOfflineMessageFlowable (), onLineStream. OnBackpressureBuffer (150,true)). ObserveOn (AndroidSchedulers mainThread ()). SubscribeWith (message - > dispatchToActivity (message) / / the orderly message forwarded to the activity Business 3}Copy the code
So, basically, the core code is done, and these few lines of code basically fulfill our requirements