In the last article (turn to the right), we introduced the ZSet structure of Redis. At the end of the article, we mentioned three usage scenarios of delay queue, leaderboard and traffic limiting. This article shows the specific use methods and problems for your reference. Code samples, just demos, need to be considered in more detail for production use.

1. Delay queue

Zset sorts by score if score represents the timestamp of the desired execution time. Insert it into the zset collection at some point, and it will be sorted by timestamp size, that is, before and after the execution time.

If the current timestamp is greater than or equal to the score of the key value, it will be taken out for consumption deletion, which can achieve the purpose of delayed execution.



Send a message

The code is as follows:

public void sendMessage(long messageId, String message) {
    System.out.println("Send message"); Jedis client = jedisPool.getResource(); Pipeline pipeline = client.pipelined(); Pipeline. Zadd (DELAY_QUEUE, system.currentTimemillis () + DELAY_TIME * 1000, String.format(MEMBER_PREFIX, messageId)); Map<String, String> map = new HashMap<>(); map.put(String.format(MEMBER_PREFIX, messageId), message); pipeline.hset(DELAY_MESSAGE, map); pipeline.syncAndReturnAll(); pipeline.close(); client.close(); System.out.println("Send message over");
}Copy the code

Write to zset and hash simultaneously using pipeline

News consumption

The code is as follows:

public void consumer() {
    System.out.println("Consumption news is on.");
    Jedis client = jedisPool.getResource();
    Set<Tuple> tupleSet = client.zrangeByScoreWithScores(DELAY_QUEUE, 0, System.currentTimeMillis());
    for (Tuple t : tupleSet) {
        long messageId = Long.valueOf(t.getElement().replaceAll("[^ 0-9]." ".""));
        messageHandler(messageId);
    }
    client.close();
    System.out.println("Consumer message over");
}

public void messageHandler(long messageId) {
    System.out.println("= = ="); Pool.execute (() -> {// put Jedis client = jedispool.getResource (); String message = client.hget(DELAY_MESSAGE, String.format(MEMBER_PREFIX, messageId)); System.out.println("Processing message body" + message);
        System.out.println("Message body processing successful");
        Pipeline pipeline = client.pipelined();
        pipeline.multi();
        pipeline.hdel(DELAY_MESSAGE, String.format(MEMBER_PREFIX, messageId));
        pipeline.zrank(DELAY_QUEUE, String.format(MEMBER_PREFIX, messageId));
        pipeline.exec();
        pipeline.close();
        client.close();
    });
}Copy the code

The problem

  1. Without AN ACK mechanism, what happens to the queue when a consumption fails?

  2. This is topic mode. How about broadcast mode

Example code is demo, simple application, put into production also need to consider all kinds of details


2. List

If you spend a lot of time in the tech community, you’re probably familiar with lists like “Most Popular in an hour.” How do you do that? If recorded in a database, real-time statistics are not easily distinguishable. We take the timestamp of the current hour as the key of Zset, the post ID as member, the number of clicks and comments as score, etc., and update score when the score changes. Use ZREVRANGE or ZRANGE to find the corresponding number of records.



Record number of replies

The code is as follows:

/** @param id */ public void post(long id) {Jedis client = jedispool.getResource (); client.zincrby(POSTLIST, 1, String.format(MEMBER_PREFIX, id)); client.close(); }Copy the code

To obtain a list of

The code is as follows:

/** * get Top post list ID ** @param size * @return
*/
public List<Integer> getTopList(int size) {
    List<Integer> result = new ArrayList<>();
    if (size <= 0 || size > 100) {
        return result;
    }
    Jedis client = jedisPool.getResource();
    Set<Tuple> tupleSet = client.zrevrangeWithScores(POSTLIST, 0, size - 1);
    client.close();
    for (Tuple tuple : tupleSet) {
        String t = tuple.getElement().replaceAll("[^ 0-9]." "."");
        result.add(Integer.valueOf(t));
    }
    return result;
}Copy the code


Simulate user Posting behavior

The code is as follows:

public void test() throws InterruptedException {
    int threadSize = 200;
    long[] ids = {100, 102, 103, 104, 105, 106, 101, 108, 107, 200, 109, 201, 202};
    CountDownLatch countDownLatch = new CountDownLatch(threadSize);
    for (int i = 0; i < threadSize; i++) {
        pool.execute(() -> {
            for (int j = 0; j < 3; j++) {
                Random r = new Random();
                int index = (int) (r.nextDouble() * ids.length);
                post(ids[index]);
            }
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
}Copy the code

The problem

  1. If the number is too large, it consumes a lot of memory and needs to clean up a lot of cold data

  2. It is suitable for dealing with clicks, visits and so on. It is also necessary to consider replies to posts that are not approved

3. The current limit

Sliding Windows are a common strategy for limiting traffic. If we define a zset with a user ID as the key, member or score will be the timestamp of the access. We only need to count the number of keys in the specified timestamp interval to obtain the access frequency in the user sliding window, and compare it with the maximum pass times to decide whether to allow the pass.

The sliding window



The code is as follows:

/** ** @param userId * @param period Window size * @param maxCount Maximum frequency limit * @return
*/
public boolean isActionAllowed(String userId, int period, int maxCount) {
    String key = String.format(KEY, userId);
    long nowTs = System.currentTimeMillis();
    Jedis client = jedisPool.getResource();
    Pipeline pipe = client.pipelined();
    pipe.multi();
    pipe.zadd(key, nowTs, String.format(MEMBER, userId, nowTs));
    pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
    Response<Long> count = pipe.zcard(key);
    pipe.expire(key, period + 1);
    pipe.exec();
    pipe.close();
    client.close();
    return count.get() <= maxCount;
}Copy the code

The idea is that when each request comes in, all records outside the time window are cleared, and only records inside the window are kept. In zset, only score value is important. Value value has no special meaning. It is only necessary to ensure that it is unique

The problem

  1. Additional data needs to be cleaned up

  2. When the number of limited requests is too large, it consumes a lot of memory

This time to share the first here, welcome to follow me to communicate, at any time to point out all kinds of mistakes and deficiencies.