This article is published by the Java Architects Association, which updates technical articles daily

Back pressure

This section first introduces what is the Backpressure problem, and then introduces several coping modes of the problem.

What is the back pressure problem

When the upstream and downstream flow operation are in a different thread, if the ejection data faster than downstream receive the speed of processing data, and for those who can cause backlog before processing the data, the data is not lost, and will not be recycled garbage collection mechanism, but in an asynchronous cache pool, if the data in the buffer pool has been can not get treatment, Eventually, you run out of memory, which is the back pressure problem in responsive programming.

An example of a back pressure problem is shown below:

package com.crazymaker.demo.rxJava.basic; // omit import @slf4j public class BackpressureDemo {/** * demo does not use back pressure */ @test public void testNoBackpressure() throws Observable Observable = Observable.create(new Observable.OnSubscribe<String>() {// Observable (subject) Observable Observable = observable. create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? Super String> subscriber) {// loop 10 times for (int I = 0; i<10 ; i++) { log.info("produce ->" + i); subscriber.onNext(String.valueOf(i)); }}}); // Observer Action1<String> subscriber = new Action1<String>() {public void call(String s){try {// Each purchase interval 50 milliseconds Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } log.info("consumer ->" + s); }}; // Subscribe: The relationship between Observable and subscriber is still connected by subscribe(). Observable .observeOn(Schedulers.newThread()) .subscribe(subscriber); Thread.sleep(Integer.MAX_VALUE); }}Copy the code

In the sample code, observable emission is performed on an IO thread fetched through the schedulers.io () scheduler, and observer subscriber consumption is performed on a newThread fetched through the schedulers.newthread () scheduler. Observable stream continuously sends data, 10 times in total. Observer subscriber receives data every 50 milliseconds.

After running the above demo, the output looks like this:

[RxIoScheduler 17:56:17. 719-2) INFO C.C.D.R.B.B ackpressureDemo - produce - > 0 17:56:17. The 723 [RxIoScheduler - 2] INFO 17:56:17 C.C.D.R.B.B ackpressureDemo - produce - > 1. 723 [RxIoScheduler - 2] INFO C.C.D.R.B.B ackpressureDemo - produce - > 2 [RxIoScheduler 17:56:17. 723-2) INFO C.C.D.R.B.B ackpressureDemo - produce - > 3 17:56:17. [RxIoScheduler - 2] 723 INFO C.C.D.R.B.B ackpressureDemo - produce - > 4 17:56:17. 723 [RxIoScheduler - 2] INFO C.C.D.R.B.B ackpressureDemo - produce - > 5 [RxIoScheduler 17:56:17. 723-2) INFO C.C.D.R.B.B ackpressureDemo - produce - > 6 17:56:17. [RxIoScheduler - 2] 723 INFO C.C.D.R.B.B ackpressureDemo - produce - > 7 17:56:17. [RxIoScheduler - 2] 723 INFO C.C.D.R.B.B ackpressureDemo - produce - > 8 [RxIoScheduler 17:56:17. 723-2) INFO C.C.D.R.B.B ackpressureDemo - produce - > 9 17:56:17. 774 INFO [RxNewThreadScheduler - 1] C.C.D.R.B.B ackpressureDemo - consumer - > 0 17:56:17. 824] [RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - [RxNewThreadScheduler 17:56:17 consumer - > 1. 875-1] INFO C.C.D.R.B.B ackpressureDemo - consumer - > 2 17:56:17. 925 [RxNewThreadScheduler - 1] INFO C.C.D.R.B.B ackpressureDemo - consumer - > 3 17:56:17. 976 INFO [RxNewThreadScheduler - 1] C.C.D.R.B.B ackpressureDemo - consumer - > 4 17:56:18. 027] [RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - Consumer - > 5 17:56:18. [078] RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - consumer - > 6 17:56:18. 129 [RxNewThreadScheduler - 1] INFO C.C.D.R.B.B ackpressureDemo - consumer - > 7 17:56:18. 179 INFO [RxNewThreadScheduler - 1] C.C.D.R.B.B ackpressureDemo - consumer - > 8 17:56:18. 230] [RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - consumer ->9Copy the code

The above program has one feature: the speed of the producer Observable ejection data is faster than that of the downstream consumer subscriber receiving and processing data. However, due to the small amount of data, the above program runs without any problems.

A simple change to the producer, the original catapult 10 to unlimited catapult, the code is as follows:

Observable Observable = observable. create(new observable. OnSubscribe<String>() {@override public void call(Subscriber<? Super String> subscriber) {// unrestricted loop for (int I = 0; ; i++) { //log.info("produce ->" + i); subscriber.onNext(String.valueOf(i)); }}});Copy the code

Running the demo again throws the following exception:

Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext
(OperatorObserveOn.java:160)
at rx.internal.operators.OperatorSubscribeOn$SubscribeOnSubscriber.onNext
(OperatorSubscribeOn.java:74)
at com.crazymaker.demo.rxJava.basic.BackpressureDemo$1.call
(BackpressureDemo.java:24)
at com.crazymaker.demo.rxJava.basic.BackpressureDemo$1.call
(BackpressureDemo.java:19)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OperatorSubscribeOn$SubscribeOnSubscriber.call
(OperatorSubscribeOn.java:100)
at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call
(CachedThreadScheduler.java:230)
 ... 9 more
Copy the code

Abnormal cause: The speed of the upstream Observable streaming ejection data is much faster than the speed of the downstream Observable receiving the ejection data through subscriber, so the queue space used by the Observable to temporarily store ejection data is exhausted, resulting in the backlog of upstream data.

Several coping modes of back pressure problems

How do you deal with back pressure? You can use an overloaded Create method of the Observable class to set a specific backpressure pattern when creating a theme. The source code for this method is as follows:

 public static <T> Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) {
 return unsafeCreate(new OnSubscribeCreate<T>(emitter, backpressure));
 }
Copy the code

The second parameter to this method is used to specify a backpressure mode. There are many types of back pressure mode, have recently “model” that are widely used in the Emitter. BackpressureMode. The LATEST. The implication of this model is that if consumption can’t keep up, only cache the data that was recently catapulted, discarding older data directly.

Using the “nearest mode” back pressure, rewrite the test case in Section 4.8.1 as follows:

// @test public void testBackpressure() throws InterruptedException { Observable Observable = Observable.create(new Action1< String>> () {@override public void Call (Emitter<String> Emitter) {// For (int I = 0; ; i++) { //log.info("produce ->" + i); emitter.onNext(String.valueOf(i)); } } }, Emitter.BackpressureMode.LATEST); // Subscriber = new Action1<String> subscriber = new Action1<String>() {public void Call (String s) {try {// Each purchase interval is 50 milliseconds Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } log.info("consumer ->" + s); }}; / / subscribe: Observable and subscriber are still associated by subscribe(). Observable subscribeOn(schedulers.io ()).observeon (schedulers.newthread ()) .subscribe(subscriber); Thread.sleep(Integer.MAX_VALUE); }Copy the code

Run the demo and some of the output is excerpted below:

[RxNewThreadScheduler 18:51:54. 736-1] INFO C.C.D.R.B.B ackpressureDemo - consumer - > 0 18:51:54. 745 [RxNewThreadScheduler - 1] INFO C.C.D.R.B.B ackpressureDemo - consumer - > 1 / / omit part of the output 18:51:55. 217] [RxNewThreadScheduler - 1 The INFO C.C.D.R.B.B ackpressureDemo - consumer - > 123 18:51:55. 220] [RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - Consumer - > 124 18:51:55. [224] RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - consumer - > 125 18:51:55. 228 [RxNewThreadScheduler - 1] INFO C.C.D.R.B.B ackpressureDemo - consumer - > 126 18:51:55. 232 INFO [RxNewThreadScheduler - 1] C.C.D.R.B.B ackpressureDemo - consumer - > 127 18:51:55. 236] [RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - Consumer - > 7337652 18:51:55. [240] RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - consumer - > 7337653 18:51:55. [244] RxNewThreadScheduler - 1 the INFO C.C.D.R.B.B ackpressureDemo - consumer - > 7337654 / / omit part of the output 18:51:55. 595 [RxNewThreadScheduler - 1] INFO C.C.D.R.B.B ackpressureDemo - consumer - > 7337747 18:51:55. 598 INFO [RxNewThreadScheduler - 1]  c.c.d.r.b.BackpressureDemo - consumer ->14161628Copy the code

As you can see from the output, the upstream topic continues to bounce, and the downstream subscriber jumps straight to 7337652 after receiving 127, with the millions of data (relatively old data) that bounced in the process being thrown away.

In addition to the Emitter. BackpressureMode. LATEST “model” recently, RxJava through an enumerated constants in the Emitter interface defines the following kinds of back pressure mode:

Enum BackpressureMode {/ * * * No backpressure is applied (No back pressure mode) * may lead to rx. Exceptions. MissingBackpressureException anomalies * or IllegalStateException */ NONE, /** * If the consumer can't keep up, An rx. Exceptions. Abnormal MissingBackpressureException * / ERROR, / * * * all cache onNext method to eject the message, /** * If the downstream consumption can't keep up, discard the new message from onNext */ DROP, /** * If the consumer can't keep up, discard the old message, cache the new message from onNext */ LATEST}Copy the code

The above RxJava back pressure mode is described as follows:

(1) Backpressuremode. DROP: In this mode, the Observable theme uses a buffer of fixed size 128. If the downstream subscriber cannot process it, the first element of the stream is cached and subsequent elements are discarded.

LATEST: This mode is similar to backpressuremode. DROP, and the Observable theme also uses a fixed size buffer of 128. Backpressuremode. LATEST has a different cache policy, replacing the cached element with the LATEST pop-up element. When the consumer can process the next element, it receives the last element that pops up in the Observable.

(3) BackpressuRemode. NONE and backpressuremode. ERROR: Data sent in these two modes does not use back pressure. If observables theme ejection data faster than downstream by the subscriber receiving speed, causing the upstream data backlog, will be thrown MissingBackpressureException anomalies.

(4) Backpressuremode. BUFFER: In this mode, there is an infinite BUFFER (128 at initialization) and all downstream elements that cannot be consumed are put into the BUFFER. If the buffer continues to accumulate, memory will run out and OutOfMemoryException will be thrown.

This article explains the content of SpringCloudRPC remote call core principle: RxJava responsive programming framework, several coping modes of back pressure problem

  1. The next article will explain the core principle of SpringCloudRPC remote call: Hystrix RPC protection principle;
  2. Feel good friends can forward this article to pay attention to small;
  3. Thank you for your support!