In the last article (turn to the right), we introduced the ZSet structure of Redis. At the end of the article, we mentioned three usage scenarios of delay queue, leaderboard and traffic limiting. This article shows the specific use methods and problems for your reference. Code samples, just demos, need to be considered in more detail for production use.
1. Delay queue
Zset sorts by score if score represents the timestamp of the desired execution time. Insert it into the zset collection at some point, and it will be sorted by timestamp size, that is, before and after the execution time.
If the current timestamp is greater than or equal to the score of the key value, it will be taken out for consumption deletion, which can achieve the purpose of delayed execution.
Send a message
The code is as follows:
public void sendMessage(long messageId, String message) {
System.out.println("Send message"); Jedis client = jedisPool.getResource(); Pipeline pipeline = client.pipelined(); Pipeline. Zadd (DELAY_QUEUE, system.currentTimemillis () + DELAY_TIME * 1000, String.format(MEMBER_PREFIX, messageId)); Map<String, String> map = new HashMap<>(); map.put(String.format(MEMBER_PREFIX, messageId), message); pipeline.hset(DELAY_MESSAGE, map); pipeline.syncAndReturnAll(); pipeline.close(); client.close(); System.out.println("Send message over");
}Copy the code
Write to zset and hash simultaneously using pipeline
News consumption
The code is as follows:
public void consumer() {
System.out.println("Consumption news is on.");
Jedis client = jedisPool.getResource();
Set<Tuple> tupleSet = client.zrangeByScoreWithScores(DELAY_QUEUE, 0, System.currentTimeMillis());
for (Tuple t : tupleSet) {
long messageId = Long.valueOf(t.getElement().replaceAll("[^ 0-9]." ".""));
messageHandler(messageId);
}
client.close();
System.out.println("Consumer message over");
}
public void messageHandler(long messageId) {
System.out.println("= = ="); Pool.execute (() -> {// put Jedis client = jedispool.getResource (); String message = client.hget(DELAY_MESSAGE, String.format(MEMBER_PREFIX, messageId)); System.out.println("Processing message body" + message);
System.out.println("Message body processing successful");
Pipeline pipeline = client.pipelined();
pipeline.multi();
pipeline.hdel(DELAY_MESSAGE, String.format(MEMBER_PREFIX, messageId));
pipeline.zrank(DELAY_QUEUE, String.format(MEMBER_PREFIX, messageId));
pipeline.exec();
pipeline.close();
client.close();
});
}Copy the code
The problem
-
Without AN ACK mechanism, what happens to the queue when a consumption fails?
-
This is topic mode. How about broadcast mode
Example code is demo, simple application, put into production also need to consider all kinds of details
2. List
If you spend a lot of time in the tech community, you’re probably familiar with lists like “Most Popular in an hour.” How do you do that? If recorded in a database, real-time statistics are not easily distinguishable. We take the timestamp of the current hour as the key of Zset, the post ID as member, the number of clicks and comments as score, etc., and update score when the score changes. Use ZREVRANGE or ZRANGE to find the corresponding number of records.
Record number of replies
The code is as follows:
/** @param id */ public void post(long id) {Jedis client = jedispool.getResource (); client.zincrby(POSTLIST, 1, String.format(MEMBER_PREFIX, id)); client.close(); }Copy the code
To obtain a list of
The code is as follows:
/** * get Top post list ID ** @param size * @return
*/
public List<Integer> getTopList(int size) {
List<Integer> result = new ArrayList<>();
if (size <= 0 || size > 100) {
return result;
}
Jedis client = jedisPool.getResource();
Set<Tuple> tupleSet = client.zrevrangeWithScores(POSTLIST, 0, size - 1);
client.close();
for (Tuple tuple : tupleSet) {
String t = tuple.getElement().replaceAll("[^ 0-9]." "."");
result.add(Integer.valueOf(t));
}
return result;
}Copy the code
Simulate user Posting behavior
The code is as follows:
public void test() throws InterruptedException {
int threadSize = 200;
long[] ids = {100, 102, 103, 104, 105, 106, 101, 108, 107, 200, 109, 201, 202};
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
pool.execute(() -> {
for (int j = 0; j < 3; j++) {
Random r = new Random();
int index = (int) (r.nextDouble() * ids.length);
post(ids[index]);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
}Copy the code
The problem
-
If the number is too large, it consumes a lot of memory and needs to clean up a lot of cold data
-
It is suitable for dealing with clicks, visits and so on. It is also necessary to consider replies to posts that are not approved
3. The current limit
Sliding Windows are a common strategy for limiting traffic. If we define a zset with a user ID as the key, member or score will be the timestamp of the access. We only need to count the number of keys in the specified timestamp interval to obtain the access frequency in the user sliding window, and compare it with the maximum pass times to decide whether to allow the pass.
The sliding window
The code is as follows:
/** ** @param userId * @param period Window size * @param maxCount Maximum frequency limit * @return
*/
public boolean isActionAllowed(String userId, int period, int maxCount) {
String key = String.format(KEY, userId);
long nowTs = System.currentTimeMillis();
Jedis client = jedisPool.getResource();
Pipeline pipe = client.pipelined();
pipe.multi();
pipe.zadd(key, nowTs, String.format(MEMBER, userId, nowTs));
pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period + 1);
pipe.exec();
pipe.close();
client.close();
return count.get() <= maxCount;
}Copy the code
The idea is that when each request comes in, all records outside the time window are cleared, and only records inside the window are kept. In zset, only score value is important. Value value has no special meaning. It is only necessary to ensure that it is unique
The problem
-
Additional data needs to be cleaned up
-
When the number of limited requests is too large, it consumes a lot of memory
This time to share the first here, welcome to follow me to communicate, at any time to point out all kinds of mistakes and deficiencies.