Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”

This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money

Failure avoidance for 24RocketMQ high availability design

To simplify communication with clients, NameServer does not immediately notify them of Broker failures. The failure avoidance mechanism is used to solve the problem that when the Broker fails and the Producer fails to perceive the failure, the message fails to be sent. This function is disabled by default. If this function is enabled, the failed Broker is temporarily excluded from the queue selection list when a message fails to be sent

MQFaultStrategy class

public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    private long[] latencyMax = {50L.100L.550L.1000L.2000L.3000L.15000L};
    private long[] notAvailableDuration = {0L.0L.30000L.60000L.120000L.180000L.600000L};

    public long[] getNotAvailableDuration() {
        return notAvailableDuration;
    }

    public void setNotAvailableDuration(final long[] notAvailableDuration) {
        this.notAvailableDuration = notAvailableDuration;
    }

    public long[] getLatencyMax() {
        return latencyMax;
    }

    public void setLatencyMax(final long[] latencyMax) {
        this.latencyMax = latencyMax;
    }

    public boolean isSendLatencyFaultEnable(a) {
        return sendLatencyFaultEnable;
    }

    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
    }

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // Whether to enable the fault delay mechanism
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // Check whether Queue is available
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            returnmq; }}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if(notBestBroker ! =null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else{ latencyFaultTolerance.remove(notBestBroker); }}catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        // Default polling
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); }}private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0; }}Copy the code

Key steps for selecting a message queue when selecting a lookup route:

  1. Select a message queue according to the polling algorithm
  2. Determine whether the message queue is available from the fault list

Determine whether in LatencyFaultToleranceImpl available:

@Override
public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if(faultItem ! =null) {
        return faultItem.isAvailable();
    }
    return true;
}

public boolean isAvailable(a) {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
Copy the code
  1. Check whether it is in the fault list. If it is not in the fault list, it is available.
  2. Check whether the current time in the fault list is greater than or equal to the start time of fault avoidance startTimestamp

At the end of the message and send the abnormal call updateFaultItem () method to update the failure list, computeNotAvailableDuration () according to the response time to calculate the failure cycle length, the longer the longer response time breakdown cycle. Network exceptions, Broker exceptions, and client exceptions all require a fixed response time of 30 seconds, and their failure period is 10 minutes. If the response time is less than 100 ms, the fault period is 0.

UpdateFaultItem LatencyFaultToleranceImpl class methods:

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        // Add to the fault list
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if(old ! =null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); }}else{ old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); }}Copy the code

FaultItem stores the Broker name, response time, and, most importantly, the start time of fault avoidance to determine whether the Queue is available