1. Case description
In a quiet afternoon, a sister in the RxHttp group feedback (group number: 378530627), their app account was pushed offline, re-login to the home page, found a request, code execution, but no callback, I can see that the sister is very worried.
what ??? And this kind of thing? Originally quiet group, suddenly active up, male compatriots a crazy guess, I summed up, as follows:
-
Is it possible that the request code did not execute and the girl made a mistake?
-
Before sending the request, an exception occurred and the code was interrupted?
-
Is the request process accompanied by a page jump, causing the request to be automatically closed when the page is destroyed?
-
An exception occurred during the request, was caught and eaten by the RxJava global exception, so no failure callback was received?
To explain, the girl uses a combination of RxHttp and RxJava to send requests
After the first round of inquiry, the above conjecture was easily overturned, and I also roughly knew the details of the case. Therefore, I used the code to restore it. In order to simplify the case, I will make appropriate modifications when restoring it, but the meaning is still the same.
2. Case restoration
In the OnCreate method of MainActivity on the home page, three requests will be made in parallel, as follows:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main_activity);
request1();
request2();
request3();
}
public void request1(a) {
RxHttp.get("/service/...")
.asString()
.to(RxLife.toMain(this)) // Page destruction, automatically close the request, and callback in the UI thread
.subscribe(s -> {
// Successful callback
}, throwable -> {
// Exception callback
});
}
public void request2(a) {
// Omit the request code, which looks like the request1() method
}
public void request3(a) {
// Omit the request code, which looks like the request1() method
}
Copy the code
This code does not appear to have any problems, after normal login, it is normal.
When you log in to the home page again, you will find that the request1(), request2(), and request3() methods are executed, but the request2() method does not receive the callback, regardless of success or failure.
3. Start a case
All the above guesses have been overturned, what next? Obviously, let’s be clear:
Did the request go through at all? Did the server receive this request?
She then used Adnroid Studio’s own Profiler tool to monitor the request and found that it had not been sent. She then checked with background staff to confirm that the request had not been received either.
Even weirder, the request code executes, but the request doesn’t go out, right? My first thought as a programmer was, how is that possible? Girl, is there something wrong with your phone? Why don’t you try a different phone? Obviously, the problem is the same when you change your phone, which is embarrassing.
Then, after debugging with my sister, I confirmed again and again that there was no problem with the request code. However, I was immersed in meditation, very desperate, very helpless, and even suspected that it was OkHttp’s problem.
As an old bird, I finally calmed down and reorganized the clue, only to find that another clue was missing, that is, the account was crowded, which automatically jumped to the login page, why only when the account was crowded, there will be a problem? As a result, I redirected my investigation
- How was the account jammed? How do I get to the login page?
Finally, the murderer emerges, and the murderer is Looper. It turns out that the girl listens to the account congestion through OkHttp interceptor, and pops up a Toast prompt through Looper, and executes the page jump logic as follows:
public class TokenInterceptor implements Interceptor {
private Context context;
// Omit some code
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Response originalResponse = chain.proceed(request);
String code = originalResponse.header("code");
if ("1".equals(code)) { // The account is crowded
Looper.prepare();
Toast.makeText(context, "Your account is logged in to another device.", Toast.LENGTH_LONG).show();
context.startActivity(new Intent(context, LoginActivity.class));
Looper.loop();
}
returnoriginalResponse; }}Copy the code
You might ask, surely there’s something wrong with that? Is it normal to pop a Toast in a child thread via Looper? I do it all the time. There’s never been a problem. Why is there a problem here?
I asked the girl to comment out the Looper and Toast codes, and only keep a line of startActivity in the if statement. After she tried it, she said to me happily, “Ok, no problem.” How to explain this?
4. Start solving the case
Looper was aggrieved and said, “You say I am a murderer, I am a murderer. Where is the proof?”
We know that inside the looper.loop () method, an infinite loop is started as follows:
public static void loop(a) {
// Omit some code
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
// Omit some code
}
// Omit some code
}
Copy the code
As you can see, queue.next() is officially commented out and may block. When will it block? When there is no message, you can see that the thread that called the looper.loop () method is in an infinite loop.
So what does this have to do with our case?
The TokenInterceptor callback is provided by the RxJava I/O thread. The CONFIGURATION of the RxJava I/O thread pool allows only one core thread to execute a task. While a task is executing, other tasks must wait for the next task to finish.
The static internal class ThreadWorker can be found in the IoScheduler class. ThreadWorker inherits from NewThreadWorker. In this class, we can find the thread pool object as follows:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
// Omit some code
public NewThreadWorker(ThreadFactory threadFactory) {
// Create a thread pool
executor = SchedulerPoolFactory.create(threadFactory);
}
// Omit some code
}
Copy the code
SchedulerPoolFactory. Point check the create method
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); // The number of core threads is 1
// Omit some code
return exec;
}
Copy the code
As you can see, we passed a 1, which is the number of core threads. We continued to find the code to create the thread pool object, as follows:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
Copy the code
The number of core threads in the pool is 1, and the number of non-core threads is unlimited. Non-core threads are recycled when idle for more than 10 milliseconds, and use a delay queue.
Be advised, high energy warning ahead
In simple terms, the thread pool will execute only one task at a time, namely serial, which explains the relevance of Looper in this case, because the thread of looper.loop () enters an infinite loop and must wait for the last task to complete when the thread pool receives other tasks. However, the last task is in an infinite loop, so the next task is never executed, which is why the request code is executed, but the request is not sent.
5. Other thoughts
At this point, a lot of people are going to have questions
-
RxJava’s Io thread pool executes serially, so how does it do it in parallel? Is it true that all the parallel code written before is actually implemented sequentially?
-
There are already tasks in the thread pool, so why do you need the thread pool to execute new tasks?
-
Why doesn’t RxJava use OkHttp’s internal thread pool configuration to enable non-core threads to execute tasks whenever they come in?
Ok, let’s answer them one by one
First, how does RxJava do parallel tasks based on the current Io thread pool?
CachedWorkerPool is a static internal class of IoScheduler that maintains a thread pool queue. Each time a new task is received, a thread pool is removed from the queue to execute the task. If not, a new thread pool is created as follows:
static final class CachedWorkerPool implements Runnable {
// This is the thread pool queue
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue
final CompositeDisposable allWorkers;
private final ThreadFactory threadFactory;
// Omit some code
// Retrieve a thread pool
ThreadWorker get(a) {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while(! expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll();if(threadWorker ! =null) {
return threadWorker; // There is one in the queue}}// The queue does not exist, create a new one
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
// Reclaim the thread pool. Add the thread pool to the cache queue when the task is cancelled or completed
void release(ThreadWorker threadWorker) {
// Set the thread pool expiration time. 60s keepAliveTime=60s
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
// Omit some code
}
Copy the code
With multiple thread pools, parallelism is achieved; In the release method of the code above, we noticed that the thread pool that was reclaimed lived for 60 seconds. In the CachedWorkerPool constructor, a timed task is started. Every 60 seconds, the thread pool queue is checked.
Then, to answer the second question, why do we get the thread pool to execute new tasks when there are already tasks in the pool?
One is when the task has been disposed normally, which is easy to understand, and the other is when the task has been canceled, for example, calling Disposable#isDisposed() to cancel the task, but this method will not cancel the task in the thread pool, As a result, the thread pool is reclaimed, but the task in the thread pool is still executing, so the next task in the thread pool has to wait.
Finally, why did RxJava design thread pools this way?
The reason is simple, to prevent the waste of thread resources, as mentioned above, although the thread pool is reclaimed, but the thread is still executing the task, it is a waste, what to do? Depending on the timer, the thread pool that has been reclaimed will be closed and removed from the queue after a certain period of time. If the thread is recycled directly through the thread pool, the thread in looper.loop () will never be recycled after it enters an infinite loop.
Why doesn’t RxJava close the thread pool when it throws it into the cache queue? Welcome to leave a comment group discussion
6, summary
Request2 () : failed to receive callback from request2() : failed to receive callback from thread pool In fact, this is an illusion, as long as there are unfinished tasks in the reclaimed thread pool, the thread pool must wait to execute the request again. If the account is repeatedly squeezed 3 times within 60 seconds, then after logging in to the home page, the 3 requests will not be executed, because the 3 thread pools of the recycle pool can no longer execute tasks, until 60 seconds later, the timer is forced to shut down and removed.
Looper should not be used at all times. You can use the main thread Handler to post a message and then process the business, or send a message to the main thread via EventBus, LiveData, etc. Reprocess the relevant logic.
About me
I am the author of RxHttp. I love open source and am committed to creating a simple and easy HTTP request framework that supports RxJava2/RxJava3/ coroutine. Up to now, I have 2200+star on Github, which is a very popular framework. In addition, I also own a group: RxHttp&RxLife communication group: 378530627, up to now there are 500+ people, there will often be technical exchanges, do not drive into, if you are not the one.