The premise

Recently, I happened to encounter the scenario of delayed tasks in the production environment. I researched the current mainstream solutions, analyzed the advantages and disadvantages, and finalized the final solution. This article records the process of research and the realization of the preliminary plan.

Comparison of candidate schemes

Here are some of the options that come to mind for implementing delayed tasks, along with the pros and cons.

plan advantage disadvantage Choose the scene
JDKBuilt-in delay queueDelayQueue Implement a simple Data memory state, unreliable Low consistency scenarios
Scheduling framework andMySQLPolling at short intervals Simple implementation and high reliability There are obvious performance bottlenecks A scenario with a small amount of data and low real-time performance
RabbitMQtheDLXandTTLCommonly referred to asDead-letter queueplan Asynchronous interaction can be peaking The length of the delay is not controllable, and performance will suffer if the data needs to be persisted
Scheduling framework andRedisPolling at short intervals Data persistence, high performance Difficult to implement Often seen in payment result callback schemes
Time round Real time high The implementation is difficult and memory consumption is large High real-time scene

If the data volume of the application is not high and the real-time requirement is low, the scheduling framework and MySQL for short interval polling is the optimal solution. However, the data volume of the scene encountered by the author is relatively large and the real-time performance is not high, so the scheme of scanning library will definitely cause great pressure on the MySQL instance. I remember a long time ago, I saw a PPT called “Evolution of Box Technology Converged Payment System”, in which there is a picture to give me a little inspiration:

It just uses the scheduling framework and Redis for short interval polling to achieve delayed tasks, but in order to share the pressure of the application, the scheme in the figure also does the sharding process. In view of the urgency of the author’s current business, sharding is not considered in the first phase of the scheme, only a simplified version of the implementation.

Since there is no code or framework posted in the PPT, some technical points to be solved need to be thought by ourselves. The detailed process of the implementation of the whole scheme will be reproduced below.

Scene design

In the actual production scenario, the author is responsible for a system that needs to be connected to an external fund party, and the corresponding attachment needs to be pushed 30 minutes later after each fund order is placed. This simplifies to a scenario where the order information data is delayed in processing, where each story records an OrderMessage (temporarily called OrderMessage) and the OrderMessage is delayed for 5 to 15 seconds before being asynchronously processed.

The implementation of the rejected candidate

Here’s a look at the other four non-selected candidates, and a look at the implementation in combination with some pseudocode and processes.

JDK built-in delay queue

DelayQueue is an implementation of a blocking queue whose queue elements must be subclasses of Delayed. Here’s a simple example:

public class DelayQueueMain {

    private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);

    public static void main(String[] args) throws Exception {
        DelayQueue<OrderMessage> queue = new DelayQueue<>();
        // The default delay is 5 seconds
        OrderMessage message = new OrderMessage("ORDER_ID_10086");
        queue.add(message);
        // Delay 6 seconds
        message = new OrderMessage("ORDER_ID_10087".6);
        queue.add(message);
        // Delay is 10 seconds
        message = new OrderMessage("ORDER_ID_10088".10);
        queue.add(message);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setName("DelayWorker");
            thread.setDaemon(true);
            return thread;
        });
        LOGGER.info("Start executing the scheduling thread...");
        executorService.execute(() -> {
            while (true) {
                try {
                    OrderMessage task = queue.take();
                    LOGGER.info("Delay processing order messages,{}", task.getDescription());
                } catch(Exception e) { LOGGER.error(e.getMessage(), e); }}}); Thread.sleep(Integer.MAX_VALUE); }private static class OrderMessage implements Delayed {

        private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        /** * The default delay is 5000 milliseconds */
        private static final long DELAY_MS = 1000L * 5;

        /** * order ID */
        private final String orderId;

        /** * Create timestamp */
        private final long timestamp;

        /** * Expiration date */
        private final long expire;

        /** * Description */
        private final String description;

        public OrderMessage(String orderId, long expireSeconds) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + expireSeconds * 1000L;
            this.description = String.format("Order [%s]- Creation time :%s, timeout :%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }

        public OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + DELAY_MS;
            this.description = String.format("Order [%s]- Creation time :%s, timeout :%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
        }

        public String getOrderId(a) {
            return orderId;
        }

        public long getTimestamp(a) {
            return timestamp;
        }

        public long getExpire(a) {
            return expire;
        }

        public String getDescription(a) {
            return description;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); }}}Copy the code

Notice that OrderMessage implements the Delayed interface, and the key is that it needs to implement Delayed#getDelay() and Delayed#compareTo(). Run the main() method:

10:16:08.240[the main] INFO club. Throwable. Delay. DelayQueueMain - began to perform scheduling threads...10:16:13.224[DelayWorker] INFO club. Throwable. Delay. DelayQueueMain - order message delay processing, order [ORDER_ID_10086] - create time for:2019-08-20 10:16:08, the timeout period is:2019-08-20 10:16:13
10:16:14.237[DelayWorker] INFO club. Throwable. Delay. DelayQueueMain - order message delay processing, order [ORDER_ID_10087] - create time for:2019-08-20 10:16:08, the timeout period is:2019-08-20 10:16:14
10:16:18.237[DelayWorker] INFO club. Throwable. Delay. DelayQueueMain - order message delay processing, order [ORDER_ID_10088] - create time for:2019-08-20 10:16:08, the timeout period is:2019-08-20 10:16:18
Copy the code

Scheduling framework + MySQL

Using the scheduling framework to conduct short interval polling for MySQL tables is a relatively easy solution to implement. Usually, this solution should be preferred when the service is just online, the table data is not much and the real-time performance is not high. But note the following:

  • Note that the polling interval cannot be too short, otherwise it will be correctMySQLInstance has an impact.
  • Pay attention to the number of queries per query. Too many result sets may cause scheduling to block and occupy a large amount of application memory, thus affecting timeliness.
  • Design the status values and maximum retry times so that you avoid large data backlogs and repeated queries as much as possible.
  • It is better to use the time column index to query data within a specified time range.

Introduction of Quartz, MySQL Java driver package and Spring-boot-starter – JDBC (here is just for the convenience of using a relatively lightweight framework, production can be selected according to the needs of other more reasonable framework) :

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.1.7. RELEASE</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.1</version>
    <scope>test</scope>
</dependency>
Copy the code

The hypothesis table is designed as follows:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; USE `delayTask`; CREATE TABLE `t_order_message` ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, Order_id VARCHAR(50) NOT NULL COMMENT 'order ID', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT' order ID', Edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'CURRENT_TIMESTAMP ', Retry_times TINYINT NOT NULL DEFAULT 0 COMMENT 'id ', order_status TINYINT NOT NULL DEFAULT 0 COMMENT' id ', INDEX idx_order_ID (order_ID), INDEX idX_create_time (create_time)) COMMENT 'order info '; INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');Copy the code

Write code:

/ / constant
public class OrderConstants {

    public static final int MAX_RETRY_TIMES = 5;

    public static final int PENDING = 0;

    public static final int SUCCESS = 1;

    public static final int FAIL = -1;

    public static final int LIMIT = 10;
}

/ / entity
@Builder
@Data
public class OrderMessage {

    private Long id;
    private String orderId;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private Integer retryTimes;
    private Integer orderStatus;
}

// DAO
@RequiredArgsConstructor
public class OrderMessageDao {

    private final JdbcTemplate jdbcTemplate;

    private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
        List<OrderMessage> list = Lists.newArrayList();
        while (r.next()) {
            list.add(OrderMessage.builder()
                    .id(r.getLong("id"))
                    .orderId(r.getString("order_id"))
                    .createTime(r.getTimestamp("create_time").toLocalDateTime())
                    .editTime(r.getTimestamp("edit_time").toLocalDateTime())
                    .retryTimes(r.getInt("retry_times"))
                    .orderStatus(r.getInt("order_status"))
                    .build());
        }
        return list;
    };

    public List<OrderMessage> selectPendingRecords(LocalDateTime start,
                                                   LocalDateTime end,
                                                   List<Integer> statusList,
                                                   int maxRetryTimes,
                                                   int limit) {
        StringJoiner joiner = new StringJoiner(",");
        statusList.forEach(s -> joiner.add(String.valueOf(s)));
        return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
                        "AND order_status IN (?) AND retry_times < ? LIMIT ?",
                p -> {
                    p.setTimestamp(1, Timestamp.valueOf(start));
                    p.setTimestamp(2, Timestamp.valueOf(end));
                    p.setString(3, joiner.toString());
                    p.setInt(4, maxRetryTimes);
                    p.setInt(5, limit);
                }, M);
    }

    public int updateOrderStatus(Long id, int status) {
        return jdbcTemplate.update("UPDATE t_order_message SET order_status = ? ,edit_time = ? WHERE id =?",
                p -> {
                    p.setInt(1, status);
                    p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
                    p.setLong(3, id); }); }}// Service
@RequiredArgsConstructor
public class OrderMessageService {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);

    private final OrderMessageDao orderMessageDao;

    private static final List<Integer> STATUS = Lists.newArrayList();

    static {
        STATUS.add(OrderConstants.PENDING);
        STATUS.add(OrderConstants.FAIL);
    }

    public void executeDelayJob(a) {
        LOGGER.info("Order processing scheduled task starts to execute......");
        LocalDateTime end = LocalDateTime.now();
        / / the day before
        LocalDateTime start = end.minusDays(1);
        List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
        if(! list.isEmpty()) {for (OrderMessage m : list) {
                LOGGER.info("Processing order [{}], status updated from {} to {}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
                // This can be optimized for batch update
                orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
            }
        }
        LOGGER.info("Order processing scheduled tasks start complete......"); }}// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
        service.executeDelayJob();
    }

    public static void main(String[] args) throws Exception {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask? useSSL=false&characterEncoding=utf8");
        config.setDriverClassName(Driver.class.getName());
        config.setUsername("root");
        config.setPassword("root");
        HikariDataSource dataSource = new HikariDataSource(config);
        OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
        OrderMessageService service = new OrderMessageService(orderMessageDao);
        // Memory mode scheduler
        StdSchedulerFactory factory = new StdSchedulerFactory();
        Scheduler scheduler = factory.getScheduler();
        // Instead of using the IOC container, pass the service reference directly with the Quartz data set
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("orderMessageService", service);
        / / the new Job
        JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
                .withIdentity("orderMessageDelayJob"."delayJob")
                .usingJobData(jobDataMap)
                .build();
        // Create a trigger and execute it every 10 seconds
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("orderMessageDelayTrigger"."delayJob")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                .build();
        scheduler.scheduleJob(job, trigger);
        // Start the schedulerscheduler.start(); Thread.sleep(Integer.MAX_VALUE); }}Copy the code

This example uses create_time to poll. You can actually add a schedule_time column to poll. This makes it easier to customize the scheduling policies for idle and busy hours. The above example works as follows:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v23.1.)'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED' Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally. NOT STARTED. Currently in standby mode. Number of jobs executed: 0 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads. Using job-store 'org. Quartz. Simpl. RAMJobStore' - which does not support persistence, and is not clustered. 11:58:27. [the main] 202 INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized fromdefault resource file in Quartz package: 'quartz. The properties' 11:58:27. 202. [the main] INFO org. Quartz. Impl. StdSchedulerFactory - quartz scheduler version: 2.3.1 11:58:27. 209. [the main] INFO org. Quartz. Core. The QuartzScheduler - the Scheduler DefaultQuartzScheduler_ $_NON_CLUSTERED 212 [DefaultQuartzScheduler_QuartzSchedulerThread] started. 11:58:27. DEBUG org. Quartz. Core. QuartzSchedulerThread - batch Acquisition of 1 Triggers 11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] the INFO club. Throwable. JDBC. OrderMessageService - order processing timing task start perform...11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)11:58:34. [559] DefaultQuartzScheduler_Worker - 1 the DEBUG org. Springframework. JDBC. Core. The JdbcTemplate - Executing prepared SQL Query 11:58:34. 565] [DefaultQuartzScheduler_Worker - 1 the DEBUG org. Springframework. JDBC. Core. The JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >=? AND create_time <= ?AND order_status IN (?)AND retry_times < ? LIMIT ? ] 11:58:34. [645] DefaultQuartzScheduler_Worker - 1 the DEBUG org. Springframework). The JDBC datasource. DataSourceUtils - Fetching JDBC Connection from DataSource 11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0, 1] 11:58:35. 335] [DefaultQuartzScheduler_Worker - 1 the INFO club. Throwable. JDBC. OrderMessageService - Processing order [10086], state changing of 0 to 1 11:58:35. 342] [DefaultQuartzScheduler_Worker - 1 the DEBUG org. Springframework. JDBC. Core. The JdbcTemplate - Executing PREPARED SQL Update 11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status=? ,edit_time = ? WHERE id =? ]11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] the INFO club. Throwable. JDBC. OrderMessageService - process orders [10087By], state0Updated to1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] the INFO club. Throwable. JDBC. OrderMessageService - order processing timing task start over...11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob'.class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
Copy the code

RabbitMQ dead letter queue

Using the RabbitMQ dead letter queue relies on two features of RabbitMQ: TTL and DLX.

  • TTL:Time To Live, message survival time, including two dimensions: queue survival time and the survival time of the message itself.
  • DLX:Dead Letter Exchange, dead letter exchange.

Let me draw a picture of these two properties:

Below, FOR simplicity, TTL uses dimensions for queues. Introducing a Java driver for RabbitMQ:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
    <scope>test</scope>
</dependency>
Copy the code

The code is as follows:

public class DlxMain {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel producerChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
        // The name of the DLX exchange is Ddlx. exchange, the type is Direct, the binding key is DdlX.key, and the queue name is DdlX.queue
        producerChannel.exchangeDeclare("dlx.exchange"."direct");
        producerChannel.queueDeclare("dlx.queue".false.false.false.null);
        producerChannel.queueBind("dlx.queue"."dlx.exchange"."dlx.key");
        Map<String, Object> queueArgs = new HashMap<>();
        // Set the queue message expiration time to 5 seconds
        queueArgs.put("x-message-ttl".5000);
        // Specify the DLX parameters
        queueArgs.put("x-dead-letter-exchange"."dlx.exchange");
        queueArgs.put("x-dead-letter-routing-key"."dlx.key");
        // Declare the service queue
        producerChannel.queueDeclare("business.queue".false.false.false, queueArgs);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("DlxConsumer");
            return thread;
        });
        // Start the consumer
        executorService.execute(() -> {
            try {
                consumerChannel.basicConsume("dlx.queue".true.new DlxConsumer(consumerChannel));
            } catch(IOException e) { LOGGER.error(e.getMessage(), e); }}); OrderMessage message =new OrderMessage("10086");
        producerChannel.basicPublish(""."business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("Message sent successfully, order ID:{}", message.getOrderId());

        message = new OrderMessage("10087");
        producerChannel.basicPublish(""."business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("Message sent successfully, order ID:{}", message.getOrderId());

        message = new OrderMessage("10088");
        producerChannel.basicPublish(""."business.queue", MessageProperties.TEXT_PLAIN,
                message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("Message sent successfully, order ID:{}", message.getOrderId());

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DlxConsumer extends DefaultConsumer {

        DlxConsumer(Channel channel) {
            super(channel);
        }

        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            LOGGER.info("Processing message successfully :{}".newString(body, StandardCharsets.UTF_8)); }}private static class OrderMessage {

        private final String orderId;
        private final long timestamp;
        private final String description;

        OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.description = String.format("Order [%s], order creation time :%s", orderId,
                    LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
        }

        public String getOrderId(a) {
            return orderId;
        }

        public long getTimestamp(a) {
            return timestamp;
        }

        public String getDescription(a) {
            returndescription; }}}Copy the code

Running the main() method results in the following:

16:35:58.638[the main] INFO club. Throwable. DLX. DlxMain - send a message is successful, the order ID:10086
16:35:58.641[the main] INFO club. Throwable. DLX. DlxMain - send a message is successful, the order ID:10087
16:35:58.641[the main] INFO club. Throwable. DLX. DlxMain - send a message is successful, the order ID:10088
16:36:03.646 [pool-1-thread-4] the INFO club. Throwable. DLX. DlxMain - processing messages successfully: order [10086], the order creation time is:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] the INFO club. Throwable. DLX. DlxMain - processing messages successfully: order [10087], the order creation time is:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] the INFO club. Throwable. DLX. DlxMain - processing messages successfully: order [10088], the order creation time is:2019-08-20 16:35:58
Copy the code

Time round

TimingWheel is a scheduling data structure with high efficiency and low delay. The underlying layer uses an array to realize the ring queue storing the task list. The schematic diagram is as follows:

Instead of analyzing the time wheel and its implementation, I will simply illustrate how to use the time wheel to implement delayed tasks. Here, using HashedWheelTimer provided by Netty, a dependency is introduced:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.39. The Final</version>
</dependency>
Copy the code

The code is as follows:

public class HashedWheelTimerMain {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    public static void main(String[] args) throws Exception {
        AtomicInteger counter = new AtomicInteger();
        ThreadFactory factory = r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
            return thread;
        };
        // tickDuration - Specifies the duration for each tick to reach the next slot
        // unit-tickduration Specifies the unit of time
        // ticksPerWhee - Number of slots in the time wheel
        Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
        TimerTask timerTask = new DefaultTimerTask("10086");
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10087");
        timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10088");
        timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DefaultTimerTask implements TimerTask {

        private final String orderId;
        private final long timestamp;

        public DefaultTimerTask(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
        }

        @Override
        public void run(Timeout timeout) throws Exception {
            System.out.println(String.format("Task execution time :%s, order creation time :%s, Order ID:%s", LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId)); }}}Copy the code

Running result:

Task execution time:2019-08-20 17:19:49.310, order creation time:2019-08-20 17:19:43.294And the order ID:10086Task execution time:2019-08-20 17:19:54.297, order creation time:2019-08-20 17:19:43.301And the order ID:10087Task execution time:2019-08-20 17:19:59.297, order creation time:2019-08-20 17:19:43.301And the order ID:10088
Copy the code

In general, tasks should be executed using a separate pool of business threads so as not to block the time round itself.

The implementation process of the selected scheme

Finally, the Sorted Set and Quartz short polling based on Redis are selected for implementation. The specific plan is:

  1. When an order is created, the order ID and the current timestamp are treated asSorted SetMember and Score to the order queueSorted SetIn the.
  2. Order ID and push content when order is createdJSONStrings are added to the order queue content as fields and values, respectivelyHashIn the.
  3. In steps 1 and 2LuaScripts are guaranteed to be atomic.
  4. Use an asynchronous thread throughSorted SetThe commandZREVRANGEBYSCOREThe order queue content corresponding to the specified number of order ids is displayedHashIn the order push content data processing.

There are two options for handling point 4:

  • Solution 1: Pop-up order content data at the same time to delete the data, that isZREVRANGEBYSCORE,ZREMandHDELThe commands have to be the sameLuaIn the scriptLuaThe script is difficult to write, and since the pop-up data is already thereRedisIf the data processing fails, compensation may need to be queried again from the database.
  • Solution 2: After the pop-up order content data, the order queue will be proactively deleted when the data processing is completeSorted SetAnd order queue contentHashIn this case, the need to control the concurrency, there is the possibility of repeated execution.

In the end, we chose solution 1, which is to pop up the order ID from the Sorted Set and delete the data in both sets as soon as we get the push data from the Hash. The flow chart of the solution looks something like this:

The Redis command used is detailed here.

Sorted Set Related commands

  • ZADDCommand – adds one or more member elements and their fractional values to an ordered set.

ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN


  • ZREVRANGEBYSCORECommand – returns all members of a specified fraction range in an ordered set. Ordered set members are arranged in descending order (from largest to smallest) by fractional value.

ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

  • Max: score interval – maximum score.
  • Min: fraction interval – minimum fraction.
  • WITHSCORES: This parameter is optional. If specified, scores will be returned.
  • LIMIT: This parameter is optional. It is the sum of offset and countMySQLtheLIMIT offset,sizeIf this parameter is not specified, the data for the entire collection is returned.

  • ZREMCommand – Is used to remove one or more members of an ordered set. Non-existent members are ignored.

ZREM key member [member ...]

Hash Related commands

  • HMSETThe – command sets multiple field-value pairs to a hash table simultaneously.

HMSET KEY_NAME FIELD1 VALUE1 ... FIELDN VALUEN


  • HDELCommand – Deletes one or more specified fields from a hash table key. Non-existent fields are ignored.

HDEL KEY_NAME FIELD1.. FIELDN

Lua related

  • loadingLuaScript and return the scriptSHA-1String:SCRIPT LOAD script.
  • Execute what has been loadedLuaScript:EVALSHA sha1 numkeys key [key ...] arg [arg ...].
  • unpackThe function can taketableThe parameters of the type are converted to variadic parameters, but with a few caveatunpackA function must use the last argument in the function call defined by an invariant, or it will be invalidated. See moreStackoverflowThe question oftable.unpack() only returns the first element.

PS: If you are not familiar with Lua language, it is recommended to learn the system, because want to use good Redis, must be inseparable from Lua.

Introducing a dependency:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.7. RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>5.1.9. RELEASE</version>
    </dependency> 
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.8</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.59</version>
    </dependency>       
</dependencies>
Copy the code

Write Lua scripts/Lua /enqueue. Lua and/Lua /dequeue. Lua

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil

-- /lua/dequeue.lua
-- Refer to jesque's partial Lua script implementation
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
The result of the -- TYPE command is {' OK ':'zset'}, so next is used for one iteration
local status.type = next(redis.call('TYPE', zset_key))
if status~ =nil and status= ='ok' then
    if type= ='zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            The unpack function converts a table to a variable argument
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil
Copy the code

Write the core API code:

// The Jedis provider
@Component
public class JedisProvider implements InitializingBean {

    private JedisPool jedisPool;

    @Override
    public void afterPropertiesSet(a) throws Exception {
        jedisPool = new JedisPool();
    }

    public Jedis provide(a){
        returnjedisPool.getResource(); }}// OrderMessage
@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
}

// Delay queue interface
public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min, String max, String offset, String limit);

    List<OrderMessage> dequeue(a);

    String enqueueSha(a);

    String dequeueSha(a);
}

// Delay queue implementation class
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue.InitializingBean {

    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
    private static final List<String> KEYS = Lists.newArrayList();

    private final JedisProvider jedisProvider;

    static {
        KEYS.add(ORDER_QUEUE);
        KEYS.add(ORDER_DETAIL_QUEUE);
    }

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try(Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); }}@Override
    public List<OrderMessage> dequeue(a) {
        // 30 minutes ago
        String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min, String max, String offset, String limit) {
        List<String> args = new ArrayList<>();
        args.add(min);
        args.add(max);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
            if (null! = eval) {for(String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); }}}return result;
    }

    @Override
    public String enqueueSha(a) {
        return ENQUEUE_LUA_SHA.get();
    }

    @Override
    public String dequeueSha(a) {
        return DEQUEUE_LUA_SHA.get();
    }

    @Override
    public void afterPropertiesSet(a) throws Exception {
        // Load the Lua script
        loadLuaScript();
    }

    private void loadLuaScript(a) throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null, sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null, sha); }}public static void main(String[] as) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        JedisProvider jedisProvider = new JedisProvider();
        jedisProvider.afterPropertiesSet();
        RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
        queue.afterPropertiesSet();
        // Write the test data
        OrderMessage message = new OrderMessage();
        message.setAmount(BigDecimal.valueOf(10086));
        message.setOrderId("ORDER_ID_10086");
        message.setUserId(10086L);
        message.setTimestamp(LocalDateTime.now().format(f));
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        // The score is set to 30 minutes earlier
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try(Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } List<OrderMessage> dequeue = queue.dequeue(); System.out.println(dequeue); }}Copy the code

To verify that the delay queue is in effect, run the main() method first:

[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]
Copy the code

Make sure the code for the delay queue is ok, and then write a consumer of the Job type of Quartz OrderMessageConsumer:

@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

    private static final AtomicInteger COUNTER = new AtomicInteger();
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        LOGGER.info("Order message processing scheduled task starts to execute......");
        List<OrderMessage> messages = orderDelayQueue.dequeue();
        if(! messages.isEmpty()) {// The simple list is equally divided into the thread pool for execution
            List<List<OrderMessage>> partition = Lists.partition(messages, 2);
            int size = partition.size();
            final CountDownLatch latch = new CountDownLatch(size);
            for (List<OrderMessage> p : partition) {
                BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
            }
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        stopWatch.stop();
        LOGGER.info("The scheduled task for order message processing is completed. Time :{} ms......", stopWatch.getTotalTimeMillis());
    }

    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final List<OrderMessage> messages;
        private final CountDownLatch latch;

        @Override
        public void run(a) {
            try {
                // In fact, this should be a single exception
                for (OrderMessage message : messages) {
                    LOGGER.info("Process order information, content :{}", message); }}finally{ latch.countDown(); }}}}Copy the code

The above consumers should be designed with the following considerations in mind:

  • use@DisallowConcurrentExecutionComments not allowedJobConcurrent execution, actually multipleJobConcurrent execution doesn’t make much sense because we are using a short interval poll, whereasRedisIs a single threaded processing command, in the client to do multithreading is actually not good.
  • The thread poolBUSINESS_WORKER_POOLThe thread capacity or queue should be combinedLIMITValue, equalization order information list usedsizeValues, andConsumeTaskThe specific execution time is considered in this case just for convenience using a fixed capacity thread pool.
  • ConsumeTaskYou should either catch and absorb exceptions separately for each order information processing, or encapsulate the logic for handling a single order information into a method that does not throw exceptions.

Other Quartz related code:

// Quartz configuration class
@Configuration
public class QuartzAutoConfiguration {

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setAutoStartup(true);
        factory.setJobFactory(quartzAutowiredJobFactory);
        return factory;
    }

    @Bean
    public QuartzAutowiredJobFactory quartzAutowiredJobFactory(a) {
        return new QuartzAutowiredJobFactory();
    }

    public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

        private AutowireCapableBeanFactory autowireCapableBeanFactory;

        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
        }

        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            / / here use AutowireCapableBeanFactory from new Job instance an automatic assembly, get a prototype (prototype) JobBean instance
            autowireCapableBeanFactory.autowireBean(jobInstance);
            returnjobInstance; }}}Copy the code

This is a temporary use of the in-memory RAMJobStore to store information about tasks and triggers. In a production environment, it is better to replace it with MySQL based clustering (i.e. JobStoreTX), and finally the launch function and CommandLineRunner implementation:

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // Prepare some test data
        prepareOrderMessageData();
        JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
                .withIdentity("OrderMessageConsumer"."DelayTask")
                .build();
        // Triggers every 5 seconds
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("OrderMessageConsumerTrigger"."DelayTask")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
                .build();
        scheduler.scheduleJob(job, trigger);
    }

    private void prepareOrderMessageData(a) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.now().format(f));
                messages.add(message);
            }
            // Lua is not used for the time being
            Map<String, Double> map = Maps.newHashMap();
            Map<String, String> hash = Maps.newHashMap();
            for (OrderMessage message : messages) {
                // Make the score 30 minutes old
                map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                hash.put(message.getOrderId(), JSON.toJSONString(message));
            }
            jedis.zadd("ORDER_QUEUE", map);
            jedis.hmset("ORDER_DETAIL_QUEUE", hash); }}}Copy the code

The output is as follows:

2019-08-21 22:45:59.518  INFO 33000 --- [ryBean_Worker-1] club. Throwable. OrderMessageConsumer: order message processing timing task starts...2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-4] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_91, amount =91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-2] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_95, amount =95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-1] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_97, amount =97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-0] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_99, amount =99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-3] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_93, amount =93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-2] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_94, amount =94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-1] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_96, amount =96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-3] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_92, amount =92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-0] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_98, amount =98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-4] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_90, amount =90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540  INFO 33000 --- [ryBean_Worker-1] club. Throwable. OrderMessageConsumer: order message processing time, the task has been completed time:22 ms......
2019-08-21 22:46:04.515  INFO 33000 --- [ryBean_Worker-2] club. Throwable. OrderMessageConsumer: order message processing timing task starts...2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_89, amount =89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_87, amount =87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_85, amount =85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_88, amount =88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_83, amount =83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_81, amount =81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_86, amount =86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_82, amount =82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_84, amount =84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club. Throwable. OrderMessageConsumer: processing order information, content: OrderMessage (orderId = ORDER_ID_80, amount =80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [ryBean_Worker-2] club. Throwable. OrderMessageConsumer: order message processing time, the task has been completed time:1ms...... .Copy the code

The first execution involves the initialization of some components, which is slow. As you will see later, the scheduled task executes faster because we simply print the order information. If you do not adjust the current architecture, note the following in production:

  • switchJobStoreforJDBCMode,QuartzThe official has the complete course, or see the author before the translationQuartzThe document.
  • You need to monitor or collect the execution status of tasks, add alerts, and so on.

The ZREVRANGEBYSCORE command is O(N), where N is the number of elements in the Set. Since all of the Sorted order information is in the same Sorted Set(ORDER_QUEUE), all of the Sorted order information is in the same Sorted Set(ORDER_QUEUE). The time complexity of the dequeue script is always high. When the subsequent order volume increases, this will definitely become a performance bottleneck. The solution will be given later.

summary

This article mainly starts with a simulation example of a real production case, analyzes some implementations of the current delayed tasks, and gives a complete example based on Redis and Quartz. The current example is only in a running state, with some issues unresolved. The next article will look at two aspects of the problem:

  1. Shard.
  2. Monitoring.

In addition, the architecture is based on the evolution of the business form, many things need to be combined with the scenario design and improvement, the idea is for reference only, do not copy the code.

The attachment

  • Markdown and PPT original: github.com/zjcscut/blo…
  • Making Page: www.throwable.club/2019/08/21/…
  • Coding Page: throwable. Coding. Me / 2019/08/21 /…

(This article c-5-D E-A-20190821 by the way opened RSS plug-in, see the icon of the home page, welcome to subscribe R-A-20190904)

The technical public account (Throwable Digest), which will push the author’s original technical articles from time to time (never plagiarize or reprint) :