Recently, I did link tracing, and then added tracing ID in WebScoket interceptor, but I found that there was data disorder, the information of user A appeared in user B’s session. So I began to dig the source code to see where the problem was.
background
In the project, we use spring-boot-starter-websocket for websocket communication service. The user’s identity is read in determineUser, the implementation class of the ShakeHandler, and we create a link traceId for link tracing, which we store in a ThreadLocal.
The TextWebSocketHandler implementation naturally uses traceId in ThreadLocal. These examples in afterConnectionEstablished interface, afterConnectionClosed interface, handleMessage interface can be seen everywhere.
In a test environment where there was almost no concurrency at the same time, there was no problem at all, and then finally we implemented this feature without pressure testing.
Oh my god!
Soon after it went live, there was data confusion. For example, user A’s traceId is displayed on user B’s request link. Because we’ve dealt with a similar situation before, just a Feign related multithreading problem. We fell for WebSocket again. So, the first reaction is the problem of multithreading.
Immediately, we moved to the test environment, started running 100 concurrent runs, and sure enough, the problem recurred.
Analysis of the
So, let’s get started on this multithreading problem. As usual, print the thread ID first to see if the thread has been reused:
We respectively in afterConnectionEstablished, afterConnectionClosed, handleTextMessage log in the first line print thread
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info(
"Thead afterConnectionEstablished {} -> {}",
session.getId(),
Thread.currentThread().getId());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
log.info(
"Thead afterConnectionClosed {} -> {}", session.getId(), Thread.currentThread().getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
log.info("Thead handleTextMessage {} -> {}", session.getId(), Thread.currentThread().getId());
}
Copy the code
Then run the pressure test script with jemeter, as expected, the same thread ID (103) appeared many times, but also appeared in different session sessions.
So, it is natural to think that this is due to Tomcat’s multi-threaded processing of requests. The solution, of course, is simply to store traceId in session.attributes, because the session content does not change, and then reset the traceId to the value stored in the session for each Websocket event handling interface before processing the business logic.
Validation from source
Here I post our Jemeter test script. The jemeter script is shown below, using the JmeterWebSocketSamplers-1.2.8. jar plugin (installation tutorial). Open a WebSocket Open Connection + fixed latency + WebSocket Single Write Sampler + WebSocket Close:
To illustrate a few scripts:
WebSocket Open Connection
Open a Websocket connectionFixed time delay
Latency is used to simulate a message push after a connection and can be configured here to test thread reuse10ms
Ok, do not exceed the total time to do the following thread concurrencyWebSocket Single Write Sampler
Simulated message push,connection
chooseusing exists connection
WebSocket Close
Disconnect thewebsocket
The connection
After the test script is configured, it is immediately executed with procedure number =1, and the background output log is as follows:
INFO c.f.l.w.h.a.AppBaseMessageHandler Thead afterConnectionEstablished fca7eba3-c5e2-a05e-73ed-08d144a021a9 -> 100 INFO c.f.l.w.h.a.AppBaseMessageHandler Thead handleTextMessage fca7eba3-c5e2-a05e-73ed-08d144a021a9 -> 104 INFO c.f.l.w.h.a.AppBaseMessageHandler Thead afterConnectionClosed fca7eba3-c5e2-a05e-73ed-08d144a021a9 -> 103Copy the code
As you can see, three different threads are used in the background for connecting, sending, and disconnecting.
So where does the thread come from, let’s break (for example, in afterConnectionClosed) and look at the thread stack
AfterConnectionClosed (ID=103) is from the http-niO-8840-exec-2 thread pool. Actually http-nio-xx is the thread pool maintained by our Tomcat NioEndpoint.
class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> { @Override public void startInternal() throws Exception { if (! running) { running = true; paused = false; . // Create worker collection if (getExecutor() == null) { createExecutor(); // create thread pool}... }}... } class AbstractEndpoint {public void createExecutor() {internalExecutor = true;} class AbstractEndpoint {public void createExecutor() {internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); Taskqueue.setparent ((ThreadPoolExecutor) executor); }}Copy the code
What does the NioEndpoint do? NioEndpoint is tomcat’s interface class for socket traffic. All requests to Tomcat are processed by NioEndpoint, and a separate thread (from the thread pool) is enabled, which is then handled by a different RequestHandler. Finally, it will go to our afterConnectionClosed and other interfaces. The specific principle can be queried about the knowledge of Tomcat, which is not described in detail here.
An important point here is that Tomcat calls processSocket AbstractEndpoint via Poller to process message events. Let’s look at the code:
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = null; // The Runnable execution unit is our worker if (processorCache! = null) { sc = processorCache.pop(); } if (sc == null) { sc = createSocketProcessor(socketWrapper, event); // Create a new worker} else {sc.reset(socketWrapper, event); // Reset worker information!! } Executor executor = getExecutor(); if (dispatch && executor ! = null) { executor.execute(sc); // Call worker's run method} else {sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }Copy the code
So, from the source code, it is clear that the thread that ultimately processes our Websocket is likely to be reusable in the thread pool, which is our SocketProcessorBase unit of work, but will reset socket-related information every time a task is executed.
Therefore, if we store some thread information (in a ThreadLocal) on a thread that is likely to be reused (i.e., coreThreads), we will have data corruption if we do not reset it in time.
The bottom line is that tomcat’s multithreading is not a major consideration when using local thread variables.