The problem background
The service is introduced
First, a brief background on exception services. The background of the service can be summarized as the consumer service of a message queue. After subscribing to the information of the upstream message queue, the information is processed in the service and finally stored in the database, as shown in the dotted line in the figure below.
The overall service is distributed deployment, and there are several instances of distributed deployment, as shown in figure 3. Each instance is responsible for contracting to consume a portion of partitions. For each partition, a Receive thread and three process threads are started to receive and transform the data in the message queue and finally store it in the format required by the business. The details of consumption for each partition in the service are as follows:
- The receive thread receives kafka’s subscribed messages and stores them to the local message queue msgQueue (a LinkedBlockingQueue). The pseudocode for the receive thread is shown below.
while(running):
Msg msg = kakfa.receive()
msgQueue.offer(msg)
Copy the code
- Each msgQueue starts three process threads to process the consumption, and then stores the result to another bizQueue (which is also an internal LinkedBlockingQueue in the JDK). The processor’s process is also simple as follows.
while(running):
Msg rawMsg = msgQueue.poll();
ProcessedMsg processed = process(rawMsg); After all kinds of transformation and processing, processing into the format of business needs
bizQueue.offer(processed);
Copy the code
- The scheduled task is executed every 30ms to retrieve and store messages from bizQueue.
The problem
In the context of the service, the reader may have the following questions.
- Why use a message queue like Kakfa?
In the context of the business, the service in this article is not the only one consuming data in a Kafka queue. There are many other services that use the same message for other message processing tasks. This takes advantage of the decoupling [^1] nature of message queues. At the same time, this is why the message transformation and processing are carried out in the second step of this service. Because there are multiple consumers of the message and different consumers pay different attention to the content, this service needs to extract and process the message in the queue, and only extract the part of the business relationship.
- Why are in-memory message queues used for internal service processing?
The msgQueue and bizQueue are two internal consumption queues. The msgQueue and bizQueue are two internal consumption queues. The msgQueue is used to buffer messages from Kafka. Similarly, messages may be processed slowly, but the input operation is very slow, and bizQueue is also needed to buffer consumption.
- Is there a message sequence problem when three processors process the message?
A very important problem in distributed queues is the sequential consumption of messages [^1] (for example, the transfer notification message must be sent after the transfer is completed), and the way to solve the local message ordering is to ensure that the messages in a partition are processed by sequential consumption. In this paper, three processors are used to process the message in parallel. The asynchronism of the four features of the operating system [^2] obviously cannot guarantee the sequential execution of the consumption. However, the business in this paper is not sensitive to the sequential execution of the message, so it can be used in this way. Caution is required for message sequence-sensitive scenarios.
Symptom & Troubleshooting steps
Phenomenon of the problem
After the service restarts for a period of time, OutOfMemoryError will interrupt service flow (that is, stop consumption), and memory overflow will occur, which is suspected to be a memory leak.
Memory leaks are possible causes of memory leaks, which can occur because some objects that should have been collected cannot be collected by the GC due to code problems or other reasons, and the number of objects that should not have been stored increases and eventually blocks up memory (like hair in the shower). However, not all OOM problems are caused by memory leaks. They can be caused by insufficient memory, such as the one in this article that was ultimately identified as a performance problem.
Troubleshooting steps
Dump the memory
At the beginning, of course, it is a priority to use the eight-character truth: “drink more hot water, restart try”.
After repeated restart and repeated OOM problems, we found that the problem is not simple, it seems that we need to seriously investigate the problems of the service.
Step 1: Adjust JVM startup parameters to automatically dump in OOM
When the service is OOM, if there is no try-catch, the thread will crash and stop the execution of the task, which is equivalent to Java flipping the table directly. After that, the memory information will have no reference value. Memory is most analytically meaningful when OOM is happening. So we adjusted the JVM startup parameters, and increased – XX: XX: + HeapDumpOnOutOfMemoryError – HeapDumpPath = / home/server/dump hprof, Jstack dump = jStack dump = jStack dump = jStack dump = jStack dump = jstack
Step 2: Linux MAT analysis tool
After modifying the parameters and restarting the service, the problem reappeared soon enough and a dump.hprof file was successfully produced. Next, we need to analyze the file and use the MAT tool of Ecplise. We usually use MAT of Windows version with perfect operation functions. However, there are 8G memory dump files in the online service, and Linux operating system is usually used in the online production environment. Therefore, we need to transfer this file to the Windows machine, which will waste too much valuable time in the process of transmission. Therefore, I choose to perform MAT analysis on Linux directly.
After searching Linux MAT on the Internet, it is found to be a copy-paste blog of each other. You can download Linux version MAT from the official website. Then execute./ parseheapdump. sh to generate a few MB zip packed HTML web page file of memory analysis results (Linux MAT’s usage method is copied from the same blog, which will not be described in this article). Then download the ZIP file and open the index.html page in it. You can get a rough overview of memory usage.
By analyzing the Class Histogram, Top Consumers, and dump_Leak_Suspects, it is almost certain that too much data is stored in these LinkedBlockingqueues (each up to 300M) within the service, This led to OOM problems. So the question is, from the above background description, we know that there are two places in the service that use LinkedBlockingQueue, which one is it? I couldn’t get any more detailed information from the Linux MAT overview, and the search results on the Web were uniformly copied from the same blog, with no mention of the issue.
Step 3: Dump files from Linux into Windows using Windows MAT
Based on the results of Linux dump, we have locked the suspects in msgQueue and bizQueue, but the dump files generated by Linux cannot support the author’s further analysis. So I come back to the question at the beginning of step 2: how do I transfer 8GB Linux files to my Windows machine?
After a series of searches, the author concluded the following methods:
- Use sZ function, but Windows SZ function has a file size limit, if you use SZ directly will receive such a pop-up prompt
After the author of a round of search, found that the solution can only be throughcat dump.hprof | split -b 2G - dump.hprof
Command to cut the file, one by one after sZ, on Windows to execute CMD commandcopy /B dump.hprof.a + dump.hprof.b + dump.hprof.c dump.hprof
Merge together
- through
python -m SimpleHTTPServer
, assuming the Linux machine has a Python environment, in which case a small HTTP server will be launched and typed into the Windows address barlinuxhost:8000
To download the dump file
The author adopted the latter.
The fourth step, MAT memory analysis process and conclusion
As a result, we know that the problem is LinkedBlockingQueue, and we have downloaded the dump file. Once we open the dump file using the MAT tool on Windows, we can further confirm which queue is overflowing. Through the Dominator Tree, we see the highest percentages of linkedBlockingQueues.
By looking at the incoming references of these LinkedBlockingQueue, we locate the msgQueue.
According to the background analysis above, if bizQueue piles up, it is most likely blocked in Step 3, causing the messages generated by bizQueue not to be consumed. Now that it is a stack of Msgqueues, it is most likely that the three processors will block during processing. The thread_overview (thread stack dump) function of memory dump is also used. The processor thread has a special name when it is started. After searching for and observing the execution status of each thread by its name, I found that the processor thread is not suspected of blocking.
So far, after all the possibilities have been eliminated, there is only one possibility, which is not that consumption is too slow, but that production is too fast. By understanding the code history annotation and analyzing the memory stack, I found that there are currently 100 messages per packet fetched from Kafka. As a result, the number of messages written to msgQueue at the same time by the receiver with the same polling speed is too large. As a result, the number of messages written to msgQueue at the same time by the receiver with the same polling speed is too large.
After expanding the capacity of the service instance by 100/40=2.5 times, the OOM problem is solved.
A few terminology explanations of MAT
- Incoming References and OutComing References
public class Foo {
private LinkedBlockingQueue msgQueue;
}
Copy the code
For objects of class Foo above, the reference relationship is Foo –> msgQueue, so for Foo’s perspective, out Refercences is msgQueue, and for msgQueue, in References is Foo
- Shallow heap and retained heap
For class Foo above, Foo itself is simply an object header + a bunch of parameters + a pointer (64bit) that executes an msgQueue (shallow heap). But if foo can be recycled, msgQueue can also be recycled in cascade, so foo +msgQueue can be recycled entirely, which is retained heap. Retained money may only be shallow, but many retained hundreds of millions of dollars in assets such as their mobile phone, bank cards, and other assets.
Conclusions and Reflections
Although the final conclusion is very simple and can be solved by horizontal expansion of the example, the process and experience of the investigation are valuable. At the same time, we need to reflect on why this is the case. As you can see from the code, msgQueue has a fixed capacity of 3000.
However, according to the analysis of memory dump file, only 285 LinkedBlockingQueue is OOM. Even though the size of the package in LinkedBlockingQueue has been increased by 2.5 times, the previous method is 285*2.5=712.5 elements
If the capacity limit of msgQueue is reasonable, it will block the receiving operation of receiver when the capacity limit is reached and wait for downstream consumption. Even if the consumption speed is slow, it can still make the streaming system run normally. Therefore, the root cause of this case lies in the unreasonable capacity setting of LinkedBlockingQueue.
references
- ^ Distributed Messaging Middleware Practices
- ^ Understanding Computer Systems in Depth.