>>>> πππ Github: π github.com/black-ant CASE Backup: π gitee.com/antblack/ca…
A. The preface
Suddenly, I found that the Server side accepts the request part is missing, this part is a bit loopy, build several threads to process, so it is necessary to patch up
Seata’s Server and Client communicate mainly through Netty
The main purpose of communication between Client and Server is to create a transaction GlobalSession and register Branch into GlobalSession.
2. Client segment request
The previous article described the Client call flow
/ / C - DefaultTransactionManager GlobalSession launched the Begin
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
// Initiate a Netty request
// timeout=300000,transactionName=dubbo-gts-seata-example
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
}
Copy the code
3. Seata Server processing
Seata sends a GlobalLockRequest to the Server. Seata sends a GlobalLockRequest to the Server. Seata sends a GlobalLockRequest to the Server via Netty.
/ / entrance class: AbstractNettyRemotingServer
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
/ /...
}
// Call process:
C- AbstractNettyRemotingServer # channelRead
C- AbstractNettyRemoting # processMessage
// RM registration
RegRmProcessor
//
BatchLogHandler
DefaultCoordinator
AbstractNettyRemotingServer
Copy the code
3.1 Entry of process
// Step 1: Netty request entry
C- AbstractNettyRemotingServer
M- channelRead(final ChannelHandlerContext ctx, Object msg)
- processMessage(ctx, (RpcMessage) msg)
// Step 2: Process Message
C- AbstractNettyRemoting
M- processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage)
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// Prepare the Pair object, get the execution class -> 3.2
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if(pair ! =null) {
if(pair.getSecond() ! =null) {
try {
// Step 1: Execute ExecutorService and prepare threads
pair.getSecond().execute(() -> {
try {
// Step 2: Execute process
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
} finally{ MDC.clear(); }}); }catch (RejectedExecutionException e) {
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@") [0];
int idx = new Random().nextInt(100);
try {
Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
} catch (IOException exx) {
}
allowDumpStack = false; }}}else{ pair.getFirst().process(ctx, rpcMessage); }}}}Copy the code
3.2 the ExecutorService perform
Initialization process of the ExecutorService
As you can see from the structure diagram above, the final abstract class is AbstractNettyRemoting, and there are two executorServices
// The ExecutorService is executed periodically after init
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1.new NamedThreadFactory("timeoutChecker".1.true));
// Used for Netty Request processing. This object is passed in through the constructor at Server initialization
ThreadPoolExecutor messageExecutor;
// PS: Here's a refresher
C- Server # main
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
Copy the code
The process of using ExecutorService
// As you can see from above, the main thread used here is NamedThreadFactory
public Thread newThread(Runnable r) {
String name = prefix + "_" + counter.incrementAndGet();
if (totalSize > 1) {
name += "_" + totalSize;
}
// Group : java.lang.ThreadGroup[name=main,maxpri=10]
// name : ServerHandlerThread_1_18_500
Thread thread = new FastThreadLocalThread(group, r, name);
thread.setDaemon(makeDaemons);
if(thread.getPriority() ! = Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); }return thread;
}
// PS: Why FastThreadLocalThread?FastThreadLocal throughput is ThreadLocal3Times!!!!!! < span style = "box-sizing: border-box; color: RGB (50, 50, 50); font-size: 14px! Important;Copy the code
3.3 Processor processing
3.3.1 Processor loading process
Processor loading is performed in The NettyRemotingServer, which is described in 3 steps:
Step 1: Enable nettyRemotingServer init in Server # main. Step 2: Register various processors. Build pairs are put into collections for runtime use
// Step 1 :Server # main
public static void main(String[] args) throws IOException {
/ /... Other logic is omitted and Netty is initialized
try {
nettyRemotingServer.init();
} catch (Throwable e) {
System.exit(-1);
}
System.exit(0);
}
// Step 2: init initialization
public void init(a) {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false.true)) {
super.init(); }}Step 3: Register the Processor
private void registerProcessor(a) {
ServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
//------
ServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
//------
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
//------
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
//------
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
// PS: As you can see, there are roughly the following types of processors:- ServerOnRequestProcessor: processing client requests information - RM/TM ServerOnResponseProcessor: processing information - RegRmProcessor RM/TM client: RM Registration processor - RegTmProcessor: TM registration processor - ServerHeartbeatProcessor: Processes heartbeat information// PS: you can also see that a messageExecutor (ThreadPoolExecutor) is passed in, which will be used to create subsequent threads
// Step End: As you can see, the Pair has already been built for later use
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}
// AbstractNettyRemoting (AbstractNettyRemoting)
protected final HashMap<Integer, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
Copy the code
3.2 Processing requests
// Step 1 : θΏε
₯Process ε€η (ServerOnRequestProcessor)
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegRmMessage(ctx, rpcMessage);
}
// Step 2: Process Request
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// omit the Log operation
if(! (messageinstanceof AbstractMessage)) {
return;
}
if (message instanceof MergedWarpMessage) {
AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
// If multiple requests are made
for (int i = 0; i < results.length; i++) {
final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
// 3.3 Call Handler to process Message
results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
} else {
// If it is a separate request
final AbstractMessage msg = (AbstractMessage) message;
// 3.3 Call Handler to process MessageAbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext); remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result); }}Copy the code
3.3 Handlers Process Message respectively
As you can see, there are multiple MessageHandlers for processing
/ / the core processing for TransactionMessageHandler: in the upper processing the received RPC messages
I- TransactionMessageHandler
M- AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)
M- void onResponse(AbstractResultMessage response, RpcContext context)
C- DefaultCoordinator
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if(! (requestinstanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
return transactionRequest.handle(context);
}
C- AbstractTCInboundHandler
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
// Initiate Gouble processing
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
e);
}
}
}, request, response);
return response;
}
Copy the code
3.4 Starting Processing
As you can see, doGlobalBegin processing officially begins in 3.3, which is linked to the Session section
public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
Copy the code
Added four.
4.1. Seata RM end Registration
// The main object registered on the Server is RegRmProcessor
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
onRegRmMessage(ctx, rpcMessage);
}
// Initiate Server registration
private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
// RegisterRMRequest{resourceIds='null', applicationId='business-seata-example', transactionServiceGroup='business-service-seata-service-group'}
RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
boolean isSuccess = false;
String errorInfo = StringUtils.EMPTY;
try {
if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
// ChannelManager Registers the current channel
ChannelManager.registerRMChannel(message, ctx.channel());
// Control the Channel version
Version.putChannelVersion(ctx.channel(), message.getVersion());
isSuccess = true; }}catch (Exception exx) {
isSuccess = false;
errorInfo = exx.getMessage();
}
// Build returns the result
RegisterRMResponse response = new RegisterRMResponse(isSuccess);
if (StringUtils.isNotEmpty(errorInfo)) {
response.setMsg(errorInfo);
}
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
}
Copy the code
The same goes for RegTmProcessor. I won’t look at it here, but you can look at the channel and version control in the future
4.2 ServerHeartbeatProcessor Heartbeat detection
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
try {
// I just typed a log
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), HeartbeatMessage.PONG);
} catch (Throwable throwable) {
LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received PING from {}", ctx.channel().remoteAddress()); }}Copy the code
conclusion
Finally, the link is complete, the whole Seata is through, the back is not ready to open a new article, ready to optimize the article well, add details!!
To summarize, ThreadPoolExecutor and NettyRemotingServer are initialized in Server # main. In the AbstractNettyRemotingServer NettyRequest for processing.