RocketMq processes deployed on the server are commonly referred to as brokers. Brokers receive messages from Producer, persist them locally, and then push them to consumers. They are usually clustered, with data synchronization between the master and slave.
The Broker and NameSever
The Broker registers itself (containing topic information) with all NameSever and maintains heartbeat connections.
-
The connection
A single broker maintains long connections to all Nameservers
-
The heartbeat
Heartbeat interval: Every 30 seconds (this time cannot be changed) heartbeat is sent to all Nameservers containing its own topic configuration information. Heartbeat timeout: Nameserver scans all surviving broker connections every 10 seconds (this time cannot be changed). If a connection has not sent heartbeat data within 2 minutes (the difference between the current time and the last update time is more than 2 minutes, this time cannot be changed), it will disconnect.
-
Disconnect the
Timing: Broker fails; The heartbeat timeout causes Nameserver to close the connection
Action: Once the connection is disconnected, Nameserver immediately senses and updates the topC’s mapping to the queue without notifying producers and consumers
What does the Broker start do
Brokerstartup.main () BrokerStartup.main() BrokerStartup.main() BrokerStartup.main() BrokerStartup.main()
public static void main(String[] args) {
/ / build BrokerController
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
/ / start
controller.start();
/ /... Omit code
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
Copy the code
The code is clear. First look at the code that builds BrokerController:
public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
try {
// Parse command line arguments
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
final BrokerConfig brokerConfig = new BrokerConfig();
// The netty server is configured to communicate with producers
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Netty client is configured to communicate with NameSever
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// If it is from a node
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
// Parse the -c argument on the command line
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if(file ! =null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
/ /... The code is omitted
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// Get the nameSever address
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null! = namesrvAddr) {try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
// Set the addressRemotingUtil.string2SocketAddress(addr); }}catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876; 192.168.0.1:9876 \ % n "",
namesrvAddr);
System.exit(-3); }}// Master/slave Settings
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
// Whether to select dleger technology
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
/ /... The code is omitted
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
/ / initialization
boolean initResult = controller.initialize();
if(! initResult) { controller.shutdown(); System.exit(-3);
}
// The JVM closes the hook function
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run(a) {
synchronized (this) {
log.info("Shutdown hook was invoked, {}".this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); }}}},"ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
Copy the code
In this code we’re going to look at what the process does, without getting too detailed: it’s basically parse the command line parameters into each config, and then new a BrokerController that puts those conig in, and then initializes BrokerController.
Controller.initialize () :
public boolean initialize(a) throws CloneNotSupportedException {
// Load the configuration file from disk
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
// Create a message store management component
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// Whether to enable dleger technology
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
// The statistical component of the broker
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
result = result && this.messageStore.load();
if (result) {
// Build netty server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// Create various thread pools
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
// Pull the thread pool for the message
this.pullMessageExecutor = newBrokerFixedThreadPoolExecutor(...) ;this.replyMessageExecutor = newBrokerFixedThreadPoolExecutor(...) ;/ /... Omit code
// The thread pool for heartbeat processing
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_".true));
this.registerProcessor();
// Various background scheduled tasks
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
Check the state of the broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//consumerOffset
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e); }}},1000 * 10.this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
/ /... Omit code
// Set the nameSever address list
if (this.brokerConfig.getNamesrvAddr() ! =null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}".this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
// Support loading namesever addresses by request
@Override
public void run(a) {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e); }}},1000 * 10.1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
/ / dleger technology
if(! messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() ! =null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true; }}else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e); }}},1000 * 10.1000 * 60, TimeUnit.MILLISECONDS); }}/ /... Omit code
/ / transaction
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
Copy the code
The code here is a little clearer, so let’s look at the main flow.
-
First load the configuration file from disk
-
Create the message storage component DefaultMessageStore
-
Build the Netty server
-
Create various thread pools (receiving messages, heartbeat detection, and so on)
-
Create background scheduled tasks.
After initialization, look at the code for starting controller.start(); :
public void start(a) throws Exception {
// Start the message store component
if (this.messageStore ! =null) {
this.messageStore.start();
}
// Start the Netty server
if (this.remotingServer ! =null) {
this.remotingServer.start();
}
if (this.fastRemotingServer ! =null) {
this.fastRemotingServer.start();
}
if (this.fileWatchService ! =null) {
this.fileWatchService.start();
}
// External communication components such as heartbeat to Namesever
if (this.brokerOuterAPI ! =null) {
this.brokerOuterAPI.start();
}
if (this.pullRequestHoldService ! =null) {
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService ! =null) {
this.clientHousekeepingService.start();
}
if (this.filterServerManager ! =null) {
this.filterServerManager.start();
}
if(! messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true.false.true);
}
// Register scheduled tasks in namesever, i.e., register and heartbeat, 30 seconds
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager ! =null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure ! =null) {
this.brokerFastFailure.start(); }}Copy the code
All this code needs to know is that BrokerController starts with all the relevant functional components started, netty services started, and a scheduled task to register with nameSever started.
The general process is as follows:
The total process:
- Build a controller
- Initialize the controller
- Start the controller
How does a Broker register itself with NameSever
The Broker starts, there will be a time background tasks, to call BrokerController. This. RegisterBrokerAll (true, false, brokerConfig. IsForceRegister ()); Method to register.
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// Encapsulate topic information
if(! PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || ! PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// Determine whether registration is required
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
/ / registerdoRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); }}Copy the code
There is no logic in this code, so let’s look at the actual registered code doRegisterBrokerAll(checkOrderConfig, Oneway, topicConfigWrapper)
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
Register yourself with all brokers
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
// If the result is greater than 0
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if(registerBrokerResult ! =null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() ! =null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); }}}}Copy the code
The brokerOuterAPI component method is called here, and we drill down
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
// Get the namesever address
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
// The request header stores the broker's information in the request header
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
/ / request body
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// All nameSever registered successfully and return
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run(a) {
try {
/ / register
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if(result ! =null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally{ countDownLatch.countDown(); }}}); }try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
Copy the code
There’s no logic here, just wrap the request and move on to the actual request, okay
//BrokerOuterAPI.java
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
// Encapsulate the request
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
// Return without waiting for registration results
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
// Send the request through nettyClient
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assertresponse ! =null;
// Process the result
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if(response.getBody() ! =null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
Copy the code
Here, the Broker to NameSever registered its own process basic is over, finally call remotingClient. InvokeSync method to send the request, not with below that, interested friends can oneself deeply. Is probably pass netty. The bootstrap. The connect () method to establish a communication connection calls netty. Channel. WriteAndFlush () method to put the topic information in the request body, put the Broker information in the request header
How does the Broker receive and store messages from producers
CommitLog
The code entry for the Broker to receive messages to store commitlogs is:
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
->
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest(ChannelHandlerContext, RemotingCommand)
Copy the code
The general process for the Broker to receive messages to store commitlogs is as follows:
Brush plate realization principle
RocketMq uses CountDownLatch and CompletableFuture for synchronous swiping. It will have a main thread and a brush thread:
CountDownLatch
CountDownLatch
CountDownLatch.await(time)
ConsumeQueue and IndexFile
The above process only stores commitlogs. The Broker also stores messages to ConsumeQueue and IndexFile. In fact, the Broker starts a thread, ReputMessageService, which forwards CommitLog updates to ConsumeQueue and IndexFile for the task handler to update.
How does the Broker clean up data on disk
Since the Broker’s data is stored on disk, there is a problem. If there is more and more data, what if the disk is full? Instead, the Broker starts a background thread that scans disk files and deletes any files that exceed 72 hours, meaning RocketMq only stores data for three days by default. Delete the conditions of
-
At 4 am
-
The disk usage exceeds 85%
Can write, but will immediately start the delete task
-
The disk usage exceeds 90%
Unable to write, delete immediately
Traverses files. If a file has not been modified for more than 72 hours, the file is deleted
Storage structure
The storage structure of the Broker is divided into three parts: CommitLog, ConsumeQueue, and IndexFile CommitLog. The CommitLog is a disk file that stores messages. ConsumeQueue is a logical message queue, which is the equivalent of a dictionary directory and is used to specify the location of messages in the CommitLog. An IndexFile is an IndexFile that can be queried using Message Key and MessageId for the specified Message content. SlotTable +indexLinkedList can be understood as a Java HashMap. Every time a new message index is added, the hashCode of the MessageKey is first used, and then the hashCode is used to obtain the total number of slots to be placed in the slot. The default number of slots is 500W. Just like HashMap, IndexFile uses a linked list structure to resolve hash collisions. The only difference with HashMap is that a pointer to the latest index is placed in slot. This is because the most recent message is always the priority when querying. The value of the pointer placed in each slot is the offset of the index in the indexFile, as shown in the figure above. Each index is 20 bytes in size, so it is easy to locate the index based on the number (offset) of the current index in the file. Each index then stores the location of the previous index in the same slot, and so on to form a linked list structure.
How does the Broker respond to consumer pull requests
-
The Broker first fetches the ConsumeQueue via Topic + queueId, and then fetches messages from the CommitLog via the MappedFile via offset in the ConsumeQueue.
-
The result of retrieving the message is then processed, and if a message is pulled, a message is returned in response to the Consumer’s request.
-
If the message cannot be pulled, the request is suspended and waits for a background scheduled task to process it.
How to ensure data reliability in case of Broker exceptions
Abnormal conditions:
2. The Broker crashed abnormally 3. OS Crash 4. The machine lost power, but the power supply was immediately restored. 5. The machine cannot be turned on (critical devices such as THE CPU, mainboard, and memory module may be damaged) 6. The disk device is damaged.Copy the code
There are four cases where the hardware resource is immediately recoverable, and RocketMQ can ensure that messages are not lost, or that a small amount of data is lost (depending on whether the flush is synchronous or asynchronous). 5-6 is a single point of failure and cannot be recovered. Once it occurs, all messages at this single point are lost. RocketMQ ensures 99% of messages are not lost through asynchronous replication in both cases, but a very small number of messages can still be lost. Synchronous double write technology can completely avoid single points, synchronous double write is bound to affect performance, suitable for high message reliability requirements, such as money-related applications.