background

A recent delay processing feature is needed, mainly from kafka after consuming a message according to a delay field in the message to delay processing, in the actual implementation process there are some need to pay attention to, recorded below.

The implementation process

Said the timing function in Java, the first thought of the Timer and ScheduledThreadPoolExecutor, but by contrast the Timer can be ruled out, the main reasons are the following:

  • Timer uses absolute time, and the change of system time will have certain influence on Timer. And ScheduledThreadPoolExecutor is using a relative time, so there will be no this problem.
  • Timer using single thread to handle tasks, long-running tasks can lead to delay treatment, other tasks while ScheduledThreadPoolExecutor can customize the number of threads.
  • Once the Timer is not to deal with a runtime exception, a task trigger a runtime exception, will lead to the collapse of the Timer, and ScheduledThreadPoolExecutor do to a runtime exception capture (can be in afterExecute () callback method for processing). So it’s safer.
  1. ScheduledThreadPoolExecutor determines to implement with ScheduledThreadPoolExecutor, followed by coding (general process code). The main delay is as follows:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy()); Int delayTime = 0; executorService.scheduleWithFixedDelay(newRunnable() {
        @Override
        public void run() {},0,delayTime, timeunit.seconds);Copy the code

Among them, NamedThreadFactory is a thread factory I customized, mainly to define the name of the thread pool and related log printing for the subsequent problem analysis, which will not be introduced here. The default rejection policy is also adopted. After testing, the function that meets the target requirement can be implemented after a specified time delay, so it seems that the function is completed. You might doubt that it’s too easy to have what to say, but this way is easy to implement a simple but there is a potential problem, the problem is, let’s look at the ScheduledThreadPoolExecutor source code:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory); }Copy the code

ScheduledThreadPoolExecutor due to delay and cycle of its own characteristics, the default used DelayWorkQueue, Unlike constructs like SingleThreadExecutor, where you can use your own LinkedBlockingQueue and set the queue size, that’s where the problem lies. DelayWrokQueue is an unbounded queue, and our target data source is Kafka, which is a message queue with high concurrency and high throughput. It is very likely that a large number of messages will come in a certain period of time, resulting in OOM. When using multi-threading, we must take OOM into consideration. Because OOM often brings serious consequences, the system OOM temporary solution is generally only restart, which may lead to user data loss and other irrecoverable problems, so from the stage of coding design to adopt as safe as possible to avoid these problems.

  1. Using Redis and thread combination, this time changed the idea, using Redis to help us do buffer, so as to avoid the problem of too many OOM messages. Redis Zset API:
ZADD key score member [[score member] [score member]... ZREM key min Max [WITHSCORES] [LIMIT offset count] ZREM key member [member...]Copy the code

We adopt zSET structure of redis basic data structure and score to store the value of our target sending time. The overall processing process is as follows:

  • The first step is data storage: at 9:10, we receive an order message from Kafka for A, which requires 30 minutes for delivery notification. Then we add the current time to 30 minutes and turn it into time stamp as A’s score. The order number with key as A is stored in Redis. The code is as follows:
public void onMessage(String topic, String message) {
        String orderId;
		int delayTime = 0;
        try {
            Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
            }.getType());
            if (msgMap.isEmpty()) {
                return;
            }
            LOGGER.info("onMessage kafka content:{}", msgMap.toString());
	    orderId = msgMap.get("orderId");
            if(StringUtils.isNotEmpty(orderId)){
                delayTime = Integer.parseInt(msgMap.get("delayTime")); Calendar calendar = Calendar.getInstance(); Calendar. add(calendar. MINUTE, delayTime); long sendTime = calendar.getTimeInMillis(); RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId); LOGGER.info("OrderId :{}-- put in redis to be sent --sendTime :{}", ---orderId:{}, sendTime);
            }
        } catch (Exception e) {
            LOGGER.info("OnMessage delay error :{}", e); }}Copy the code
  • The second step is data processing: the specific scheduling time of another thread is determined according to business requirements. I perform it every 3 minutes here. Internal logic: Get a certain amount of Zset data from Redis, how to get it? Use zrangeByScore method of ZSET to sort according to the score of the data. Of course, you can add the time period, from 0 to now, to consume. After taking out data, we need to use ZREM method to delete the data taken out from Zset to prevent other threads from consuming data repeatedly. This is followed by the next shipping notification and other relevant logic. The code is as follows:
public void runInt orderNum = integer.parseInt (propertyutil.get (constant.order_num,"100")); Calendar = calendar.getInstance (); Calendar = Calendar. long now = calendar.getTimeInMillis(); // Get the event key from infinitely back to now (in case the last batch quantity is less than the number put in, Set<String> orderIds = redisutils.getInstance ().zrangebyScore (Constant.DELAY, 0, now, 0, orderNum); LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
            if(CollectionUtils isNotEmpty (orders)) {/ / delete the key to prevent repeat sentfor(String orderId : orderIds) { RedisUtils.getInstance().zrem(Constant.DELAY, orderId); }} Catch (Exception e) {logger.warn ()"task.run exception:{}", e); }}Copy the code

This completes the function of relying on Redis and thread to complete the delay sending.

conclusion

Then compare the advantages and disadvantages of the two different implementations above:

  • The first method is simple to implement and does not rely on external components, which can quickly achieve the target function. However, its disadvantages are also obvious. It needs to be used in specific scenarios.

  • Second way to achieve a little bit complicated, but can adapt to a large quantity of news scene, using redis zset as “middleware” effect, to the realization of the function of the delay and help us to better adapt to the high concurrency scenario, the disadvantage is need to consider the actual factors in the process of writing is more, such as the thread of execution cycle time, There may be some time delay in sending, batch data size Settings, etc.

To sum up, it is the summary of the two ways to realize the delay function. Which way to adopt needs to be selected according to the actual situation, and I hope to bring help to you. Ps: Due to my limited technical ability, there may be inaccurate or wrong technical description in the article, please kindly point it out. I will correct it immediately to avoid misleading you. Thank you!