1. Introduction

Seckill is a short-term, sudden and highly concurrent access problem in nature. Its service features are as follows:

  1. Timed trigger, traffic in an instant sudden increase
  2. Seckill requests are often only partially successful
  3. Instant kill products are often limited in quantity and cannot be oversold, but can accept selling less
  4. Real order results are not required to be returned immediately

This article mainly explains the actual use of RocketMQ in the seckill scenario, and does not explain other business processes in detail.

Here is the flow chart of seckill:

For a detailed implementation, see code detail: Big Guy source code

2. Seckill service overview

By asynchronizing the seckill core business process, we can divide the main process into two stages: receipt and order.

2.1 Seckill process — Receipt

  1. The user accesses the seckill portal and submits the seckill request to the acquiring gateway of the seckill platform. The platform performs pre-verification on the seckill request
  2. After the verification is passed, the order request will be submitted through the middle layer such as cache/queue/thread pool. When the delivery is completed, it will return “queuing” to the user.
  3. For the order request that the pre-verification fails, the order failure of seckill is returned synchronously

At this point, the interaction with the user side ends.

During the purchase process, the seckill order is placed into the RocketMQ middle tier.

2.2 Seckill process — order

In the ordering process, the platform pressure through the buffer of the middle layer has actually been much smaller, on the one hand, because part of the illegal requests are filtered out in the synchronous verification process of the user order; On the other hand, we do speed limiting, order pressing and other operations on order requests by doing some logic such as flow limiting and filtering in the middle layer, so as to slowly digest the order requests internally and minimize the impact of the flow on the lasting layer of the platform. Here is actually reflected in the middle layer “peak filling valley” characteristics.

Based on the above premise, we briefly summarize the business logic of seckill order part.

  1. Seckill order service obtains the order request from the middle layer and carries out the real pre-order verification. Here, the real inventory verification is mainly carried out
  2. When inventory is successfully reduced (or locked), a real order is initiated. The inventory deduction (lock inventory) and the order operation are generally in the same transaction domain
  3. After placing a successful order, the platform will often initiate a message push to inform users of the success of placing an order and guide users to make payment
  4. If the user does not pay for a period of time (e.g., 30mins), the order will be invalid, the inventory will be restored, and the purchase opportunity will be provided to other users in the queue
  5. If the payment is successful, the order status will be updated and the order will be transferred to other subsystems. For example, the logistics system will carry out the delivery and other follow-up processing of the order in the successful payment processing

At this point, it’s basically the core main process of the seckill business.

Further abstract seckill request -> middle layer -> real order this scenario, is it very much like we often use an asynchronous business processing pattern?

As you can see, that’s the producer-consumer model.

The producer-consumer pattern is often implemented within processes through blocking queues or wait-to-notify mechanisms, and between services through message queues, which is the technique used in this case. In this paper, the RocketMQ message queue is used to decouple kill orders to achieve peak load and improve system throughput.

Next, I’ll show you how to implement the above scenario using RocketMQ.

3. The actual combat

3.1 structure

  1. The user accesses the seckill gateway seckill-gateway-service and initiates a seckill operation on the interested goods. In particular, the product information is loaded to seckill-gateway-service during system initialization. During the pre-inventory check, the user order traffic has been filtered according to the cache
  2. After sufficient pre-validation of the seckill order, the gateway delivers the seckill order message to RocketMQ and synchronously returns it to the queue to the user
  3. Seckill order platform seckill-order-service subscribers seckill order message, carries out idempotent-processing on the message, and carries out real order operation after real verification of commodity inventory

3.2 Database Structure

3.3 NameServer configuration

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MQNamesrvConfig {

    @Value("${rocketmq.nameServer.offline}")
    String offlineNamesrv;

    @Value("${rocketmq.nameServer.aliyun}")
    String aliyunNamesrv;

    /** * Select nameServer address * based on the environment@return* /
    public String nameSrvAddr(a) {
        String envType = System.getProperty("envType");
        //System.out.println(envType);
        if (StringUtils.isBlank(envType)) {
            throw new IllegalArgumentException("please insert envType");
        }
        switch (envType) {
            case "offline" : {
                return offlineNamesrv;
            }
            case "aliyun" : {
                return aliyunNamesrv;
            }
            default : {
                throw new IllegalArgumentException("please insert right envType, offline/aliyun"); }}}}Copy the code

3.4 Message Protocol

Here, through the implementation of BaseMsg template methods encode and decode (meaning to encode and decode the message respectively), through the attribute setting of this object, the self-coding and decoding of the message protocol is realized.

/ * * *@descBasic protocol class */
public abstract class BaseMsg {

    public Logger LOGGER = LoggerFactory.getLogger(this.getClass());

    /** Version number, default 1.0*/
    private String version = "1.0";
    /** Theme name */
    private String topicName;

    public abstract String encode(a);

    public abstract void decode(String msg);

    public String getVersion(a) {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public String getTopicName(a) {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

    @Override
    public String toString(a) {
        return "BaseMsg{" +
                "version='" + version + '\' ' +
                ", topicName='" + topicName + '\' ' +
                '} '; }}Copy the code
/ * * *@className OrderNofityProtocol
 * @descOrder Result Notification Agreement */
public class ChargeOrderMsgProtocol extends BaseMsg implements Serializable {

    private static final long serialVersionUID = 73717163386598209L;

    /** Order Number */
    private String orderId;
    /** User orders mobile phone number */
    private String userPhoneNo;
    / * * id * / goods
    private String prodId;
    /** User transaction amount */
    private String chargeMoney;

    private Map<String, String> header;
    private Map<String, String> body;

    @Override
    public String encode(a) {
        // Assemble the message protocol header
        ImmutableMap.Builder headerBuilder = new ImmutableMap.Builder<String, String>()
                .put("version".this.getVersion())
                .put("topicName", MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic());
        header = headerBuilder.build();

        body = new ImmutableMap.Builder<String, String>()
                .put("orderId".this.getOrderId())
                .put("userPhoneNo".this.getUserPhoneNo())
                .put("prodId".this.getProdId())
                .put("chargeMoney".this.getChargeMoney())
                .build();

        ImmutableMap<String, Object> map = new ImmutableMap.Builder<String, Object>()
                .put("header", header)
                .put("body", body)
                .build();

        // Return the serialized message Json string
        String ret_string = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            ret_string = objectMapper.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("ChargeOrderMsgProtocol Message serialization JSON exception", e);
        }
        return ret_string;
    }

    @Override
    public void decode(String msg) {
        Preconditions.checkNotNull(msg);
        ObjectMapper mapper = new ObjectMapper();
        try {
            JsonNode root = mapper.readTree(msg);
            // header
            this.setVersion(root.get("header").get("version").asText());
            this.setTopicName(root.get("header").get("topicName").asText());
            // body
            this.setOrderId(root.get("body").get("orderId").asText());
            this.setUserPhoneNo(root.get("body").get("userPhoneNo").asText());
            this.setChargeMoney(root.get("body").get("chargeMoney").asText());
            this.setProdId(root.get("body").get("prodId").asText());
        } catch (IOException e) {
            throw new RuntimeException("ChargeOrderMsgProtocol message deserialization exception", e); }}public String getOrderId(a) {
        return orderId;
    }

    public ChargeOrderMsgProtocol setOrderId(String orderId) {
        this.orderId = orderId;
        return this;
    }

    public String getUserPhoneNo(a) {
        return userPhoneNo;
    }

    public ChargeOrderMsgProtocol setUserPhoneNo(String userPhoneNo) {
        this.userPhoneNo = userPhoneNo;
        return this;
    }

    public String getProdId(a) {
        return prodId;
    }

    public ChargeOrderMsgProtocol setProdId(String prodId) {
        this.prodId = prodId;
        return this;
    }

    public String getChargeMoney(a) {
        return chargeMoney;
    }

    public ChargeOrderMsgProtocol setChargeMoney(String chargeMoney) {
        this.chargeMoney = chargeMoney;
        return this;
    }

    @Override
    public String toString(a) {
        return "ChargeOrderMsgProtocol{" +
                "orderId='" + orderId + '\' ' +
                ", userPhoneNo='" + userPhoneNo + '\' ' +
                ", prodId='" + prodId + '\' ' +
                ", chargeMoney='" + chargeMoney + '\' ' +
                ", header=" + header +
                ", body=" + body +
                "}" + super.toString(); }}Copy the code

3.5 Seconds kill order producer initialization

Load via @postConstruct (init())

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.gateway.common.config.MQNamesrvConfig;
import org.apache.rocketmq.gateway.common.util.LogExceptionWapper;
import org.apache.rocketmq.message.constant.MessageProtocolConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/ * * *@className SecKillChargeOrderProducer
 * @descSeckill order producer initialization */
@Component
public class SecKillChargeOrderProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderProducer.class);

    @Autowired
    MQNamesrvConfig namesrvConfig;

    @Value("${rocketmq.acl.accesskey}")
    String aclAccessKey;

    @Value("${rocketmq.acl.accessSecret}")
    String aclAccessSecret;


    private DefaultMQProducer defaultMQProducer;

    @PostConstruct
    public void init(a) {
        defaultMQProducer =
                new DefaultMQProducer
                        (MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getProducerGroup(),
                                new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)));
        defaultMQProducer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
        // Retry times of sending failure
        defaultMQProducer.setRetryTimesWhenSendFailed(3);
        try {
            defaultMQProducer.start();
        } catch (MQClientException e) {
            LOGGER.error("[seconds kill order producers] - SecKillChargeOrderProducer loading exception! e={}", LogExceptionWapper.getStackTrace(e));
            throw new RuntimeException("[seconds kill order producers] - SecKillChargeOrderProducer loading exception!", e);
        }
        LOGGER.info("[seconds kill order producers] - SecKillChargeOrderProducer loaded!");
    }

    public DefaultMQProducer getProducer(a) {
        returndefaultMQProducer; }}Copy the code

3.6 Seckill Order entry (producer)

/** * Platform single interface *@param chargeOrderRequest
* @return* /
@RequestMapping(value = "charge.do", method = {RequestMethod.POST})
public @ResponseBody Result chargeOrder(@ModelAttribute ChargeOrderRequest chargeOrderRequest) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String sessionId = attributes.getSessionId();
// Check the pre-order parameters
if(! secKillChargeService.checkParamsBeforeSecKillCharge(chargeOrderRequest, sessionId)) {return Result.error(CodeMsg.PARAM_INVALID);
}
// Pre-check the goods
String prodId = chargeOrderRequest.getProdId();
if(! secKillChargeService.checkProdConfigBeforeKillCharge(prodId, sessionId)) {return Result.error(CodeMsg.PRODUCT_NOT_EXIST);
}
// Pre-reduce inventory
if(! secKillProductConfig.preReduceProdStock(prodId)) {return Result.error(CodeMsg.PRODUCT_STOCK_NOT_ENOUGH);
}
// Second kill order to enter the queue
return secKillChargeService.secKillOrderEnqueue(chargeOrderRequest, sessionId);
}
Copy the code

Producer: secKillChargeService: : secKillOrderEnqueue

/** *@param chargeOrderRequest
 * @param sessionId
 * @return* /
@Override
public Result secKillOrderEnqueue(ChargeOrderRequest chargeOrderRequest, String sessionId) {

    // Order number generation, assemble seckill order message protocol
    String orderId = UUID.randomUUID().toString();
    String phoneNo = chargeOrderRequest.getUserPhoneNum();
	
    // Message encapsulation
    ChargeOrderMsgProtocol msgProtocol = new ChargeOrderMsgProtocol();
    msgProtocol.setUserPhoneNo(phoneNo)
        .setProdId(chargeOrderRequest.getProdId())
        .setChargeMoney(chargeOrderRequest.getChargePrice())
        .setOrderId(orderId);
    String msgBody = msgProtocol.encode();
    LOGGER.info("Seckill order queued, message protocol ={}", msgBody);

    DefaultMQProducer mqProducer = secKillChargeOrderProducer.getProducer();
    // Assemble the RocketMQ message body
    Message message = new Message(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), msgBody.getBytes());
    try {
        // Send the message
        SendResult sendResult = mqProducer.send(message);
        / / determine SendStatus
        if (sendResult == null) {
            LOGGER.error(MsgBody ={},sendResult=null", sessionId, msgBody);
            return Result.error(CodeMsg.BIZ_ERROR);
        }
        if(sendResult.getSendStatus() ! = SendStatus.SEND_OK) { LOGGER.error(MsgBody ={},sendResult=null", sessionId, msgBody);
            return Result.error(CodeMsg.BIZ_ERROR);
        }
        ChargeOrderResponse chargeOrderResponse = new ChargeOrderResponse();
        BeanUtils.copyProperties(msgProtocol, chargeOrderResponse);
        LOGGER.info("SessionId ={}, secessionkill order message was successfully posted, order entered the queue. The chargeOrderResponse = {}, sendResult = {}", sessionId, chargeOrderResponse.toString(), JSON.toJSONString(sendResult));
        return Result.success(CodeMsg.ORDER_INLINE, chargeOrderResponse);
    } catch (Exception e) {
        int sendRetryTimes = mqProducer.getRetryTimesWhenSendFailed();
        LOGGER.error(MsgBody ={},e={}", sessionId, sendRetryTimes, msgBody, LogExceptionWapper.getStackTrace(e));
    }
    return Result.error(CodeMsg.BIZ_ERROR);
}
Copy the code

3.7 Kill consumption in seconds

3.7.1 Defining the Consumer Client

Seconds to kill the order consumer

@Component
public class SecKillChargeOrderConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderConsumer.class);

    @Autowired
    MQNamesrvConfig namesrvConfig;

    @Value("${rocketmq.acl.accesskey}")
    String aclAccessKey;

    @Value("${rocketmq.acl.accessSecret}")
    String aclAccessSecret;

    private DefaultMQPushConsumer defaultMQPushConsumer;

    @Resource(name = "secKillChargeOrderListenerImpl")
    private MessageListenerConcurrently messageListener;

    @PostConstruct
    public void init(a) {
        defaultMQPushConsumer =
                new DefaultMQPushConsumer(
                    MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getConsumerGroup(),
                        new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)),
                        // Average allocation of the queue algorithm, hash
                        new AllocateMessageQueueAveragely());
        defaultMQPushConsumer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
        // Start from scratch
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Consumption mode: cluster mode
        // Cluster: the same message can only be consumed by one consumer node
        // Broadcast: The same message will be consumed by every consumer
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // Register the listener
        defaultMQPushConsumer.registerMessageListener(messageListener);
        // Set the number of messages to be pulled each time. Default is 1
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
        // Subscribe to all messages
        try {
            defaultMQPushConsumer.subscribe(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), "*");
            // Start the consumer
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            LOGGER.error("[seconds kill order consumers] - SecKillChargeOrderConsumer loading exception! e={}", LogExceptionWapper.getStackTrace(e));
            throw new RuntimeException("[seconds kill order consumers] - SecKillChargeOrderConsumer loading exception!", e);
        }
        LOGGER.info("[seconds kill order consumers] - SecKillChargeOrderConsumer loaded!"); }}Copy the code

3.7.2 Implementation of seckill acquisition core logic

Seconds kill single core logic, it is to realize our own MessageListenerConcurrently.

@Component
public class SecKillChargeOrderListenerImpl implements MessageListenerConcurrently {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderListenerImpl.class);

    @Resource(name = "secKillOrderService")
    SecKillOrderService secKillOrderService;

    @Autowired
    SecKillProductService secKillProductService;

    /** * Seckill core consumption logic *@param msgs
     * @param context
     * @return* /
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt msg : msgs) {
                // Message decoding
                String message = new String(msg.getBody());
                int reconsumeTimes = msg.getReconsumeTimes();
                String msgId = msg.getMsgId();
                String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;
                LOGGER.info("[consumers] kill order - SecKillChargeOrderConsumer - receives the message, the message = {}, {}", message, logSuffix);

                // Deserialize protocol entities
                ChargeOrderMsgProtocol chargeOrderMsgProtocol = new ChargeOrderMsgProtocol();
                chargeOrderMsgProtocol.decode(message);
                LOGGER.info("[consumers] kill order - SecKillChargeOrderConsumer - deserialized to kill inbound orders entity chargeOrderMsgProtocol = {}, {}", chargeOrderMsgProtocol.toString(), logSuffix);

                // Consumption idempotent: queries whether the order corresponding to orderId already exists
                String orderId = chargeOrderMsgProtocol.getOrderId();
                OrderInfoDobj orderInfoDobj = secKillOrderService.queryOrderInfoById(orderId);
                if(orderInfoDobj ! =null) {
                    LOGGER.info("[consumers] kill order - SecKillChargeOrderConsumer - current order has been put in storage, do not need to repeat consumption! ,orderId={},{}", orderId, logSuffix);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                // Service idempotent: the same prodId+ the same userPhoneNo has only one kill order
                OrderInfoDO orderInfoDO = new OrderInfoDO();
                orderInfoDO.setProdId(chargeOrderMsgProtocol.getProdId())
                        .setUserPhoneNo(chargeOrderMsgProtocol.getUserPhoneNo());
                Result result = secKillOrderService.queryOrder(orderInfoDO);
                if(result ! =null && result.getCode().equals(CodeMsg.SUCCESS.getCode())) {
                    LOGGER.info("[consumers] kill order - SecKillChargeOrderConsumer - the current user = {}, seconds kill the product = {} order already exists, no duplicate seconds kill, orderId = {}",
                            orderInfoDO.getUserPhoneNo(), orderInfoDO.getProdId(), orderId);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                // Second kill order to store
                OrderInfoDO orderInfoDODB = new OrderInfoDO();
                BeanUtils.copyProperties(chargeOrderMsgProtocol, orderInfoDODB);

                // Check the inventory
                String prodId = chargeOrderMsgProtocol.getProdId();
                SecKillProductDobj productDobj = secKillProductService.querySecKillProductByProdId(prodId);
                // Check the inventory
                int currentProdStock = productDobj.getProdStock();
                if (currentProdStock <= 0) {
                    LOGGER.info("[decreaseProdStock] The current product is sold out, news consumption success! prodId={},currStock={}", prodId, currentProdStock);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                // Formal order
                if (secKillOrderService.chargeSecKillOrder(orderInfoDODB)) {
                    LOGGER.info("[consumers] kill order - SecKillChargeOrderConsumer - kill order warehouse is successful, the message consumption success! OrderInfoDO ={},{}", orderInfoDO.toString(), logSuffix);
                    // Simulate order processing, directly change the order status to processing
                    secKillOrderService.updateOrderStatusDealing(orderInfoDODB);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                returnConsumeConcurrentlyStatus.RECONSUME_LATER; }}catch (Exception e) {
            LOGGER.info("[Seckill order consumer] consumption is abnormal,e={}", LogExceptionWapper.getStackTrace(e));
        }
        returnConsumeConcurrentlyStatus.RECONSUME_LATER; }}Copy the code

3.7.3 Kill the actual storage

The actual placing of the order is in the same local transaction as the actual inventory deduction

/** * Seckill order to stock *@param orderInfoDO
 * @return* /
@Transactional(rollbackFor = Exception.class)
@Override
public boolean chargeSecKillOrder(OrderInfoDO orderInfoDO) {
    int insertCount = 0;
    String orderId = orderInfoDO.getOrderId();
    String prodId = orderInfoDO.getProdId();

    / / inventory reduction
    if(! secKillProductService.decreaseProdStock(prodId)) { LOGGER.info([insertSecKillOrder]orderId={},prodId={}, orderId, prodId);
        // TODO can send a notification to the user that the order of seckill has failed because the item is sold out
        return false;
    }
    // Set the product name
    SecKillProductDobj productInfo = secKillProductService.querySecKillProductByProdId(prodId);
    orderInfoDO.setProdName(productInfo.getProdName());
    try {
        insertCount = secKillOrderMapper.insertSecKillOrder(orderInfoDO);
    } catch (Exception e) {
        LOGGER.error("[insertSecKillOrder]orderId={}, [exception], transaction rollback,e={}", orderId, LogExceptionWapper.getStackTrace(e));
        String message =
                String.format([insertSecKillOrder]orderId=%s, orderId);
        throw new RuntimeException(message);
    }
    if(insertCount ! =1) {
        LOGGER.error("[insertSecKillOrder]orderId={}, [failed], transaction rollback,e={}", orderId);
        String message =
                String.format([insertSecKillOrder]orderId=%s [failed], orderId);
        throw new RuntimeException(message);
    }
    return true;
}
Copy the code

4. Summary & References

summary

Look at the flowchart again, do not understand the place, look at the source code.

Payment, logistics and other operations after placing an order can be processed asynchronously using RocketMQ.

This article all the code from the big source code

Study RocketMQ for real combat only.

The resources

  • Bosses source