High concurrency second kill system based on Redis+Zookeeper+MySQL
Source code will be published at the end of the article Git address, thank you for your support and concern author: Lucas
Preface:
On January 12, we released our first brief introduction to the seckill system. Today, we will continue to optimize and update on the basis of the first article.
In the first article, we introduced. With JVM native cache +Redis, in a distributed deployment, there can be problems with multiple machines or multiple JVMS with inconsistent caches (see the background on the problem in seckill Systems 1, January 12). Today we solve this problem with the distributed coordination framework Zookeeper.
Previous code and problem description:
private static Map<String,Boolean> params = new ConcurrentHashMap<>();
@PostMapping("/secKill/{productId}/{userId}")
public ResultRtn secKill(@PathVariable("productId") Long productId,@PathVariable("userId") Long userId){
/ / the Key of the JVM
String jvmKey = "product"+productId;
try{
// Determine the JVM cache
if(params.containsKey(jvmKey)){
return ResultRtn.error("This item is sold out.");
}
// Redis Key for inventory
String redisKey = "product:"+productId+":stock";
//Redis inventory minus 1
Long count = redisTemplate.opsForValue().decrement(redisKey);
if(count<0){
redisTemplate.opsForValue().increment(redisKey);
params.put("product"+productId,true);
return ResultRtn.error("This item is sold out.");
}
productService.updateStock(productId);
}catch (Exception e){
params.remove(jvmKey);
// there is an exception Redis minus 1, add back
redisTemplate.opsForValue().increment(redisKey);
}
return ResultRtn.ok("Snap it up");
}
Copy the code
It's the same here, in catch. Add back the inventory subtracted from the two caches (JVM and Redis).Copy the code
The above code is a typical example of using the JVM and Redis dual cache to implement a snapkill. In a single-server deployment, the above code is safe and should have few major problems. But if our application is in a distributed deployment. The JVM cache is inconsistent across multiple machines. This is a very serious problem. The scenario is as follows ****** : ****** For example, there is a product iphone12, only the last one left in stock. When thread A runs to line 14 of the code, thread B requests to come in from machine B and finds that the inventory is insufficient, it will cache the JVM of machine B and set the Value corresponding to the Key of the product to true. Then machine A gets an error at line 20 of code. Then the inventory reduced by Redis should be added back, that is to say, there is still one inventory left, but the JVM cache of machine B has indicated that this product has been robbed. So if all the subsequent requests or machine A goes down, all the requests go to machine B, causing everyone to fail to buy all of them. In the end, not all the stock was sold. There is a phenomenon of underselling. The general execution process sequence is as follows: 1: A requests to enter A machine and grabs the last commodity, and the Redis inventory deduction is 0. 2: At this moment, when A request is not completely finished, B requests to enter and finds that the inventory is insufficient. Machine B’s JVM cache is True. 3: At this time, request A on machine A throws an exception for some reason. Then A requests to add back the inventory just reduced in Redis. Because an exception was thrown, it means that the item was not robbed. 4: But the JVM cache of machine B has indicated that the product has been robbed. So if all the subsequent requests are sent to machine B, will all the requests not be able to grab the product? Or let’s say machine A is down, all requests go to machine B, and all attempts to snap up the item fail. So that’s underselling.
How to solve the above problems? It comes down to synchronization between our JVM caches. Even if the JVM cache on machine A changes, the cache on machine B or other distributed machines should be flushed simultaneously.
Using the Distributed Coordination Framework (Zookeeper) to solve the problem of inconsistent JVM cache:
When machine B requests an item to be sold out, the JVM of the item on machine B will mark it as True and use Zookeeper to listen on the item node. Zookeeper is used to inform machine B that machine B needs to delete the JVM cache of this item when machine A needs to roll back. The source code is as follows:
** Configure Zookeeper information **
@Configuration
public class ZookeeperConfig {
/** * zookeeper IP */
@Value("${zookeeper.ip}")
private String ip;
/** * Zookeeper port */
@Value("${zookeeper.host}")
private String host;
/** * The service port of the current application, used for print tests */
@Value("${server.port}")
private String serverPort;
@Bean
public ZooKeeper initZookeeper(a) throws Exception {
// Create an observer
ZookeeperWatcher watcher = new ZookeeperWatcher(serverPort);
// Create a Zookeeper client
ZooKeeper zooKeeper = new ZooKeeper(ip+":"+host, 30000, watcher);
// Register the client with the observer
watcher.setZooKeeper(zooKeeper);
// Return the configured ZooKeeper
returnzooKeeper; }}Copy the code
Configuring Zookeeper listening:
@Service
@Slf4j
public class ZookeeperWatcher implements Watcher {
private ZooKeeper zooKeeper;
public ZooKeeper getZooKeeper(a) {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
private String serverPort;
public String getServerPort(a){
return this.serverPort;
}
public ZookeeperWatcher(a){}public ZookeeperWatcher (String serverPort){
this.serverPort = serverPort;
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("************************zookeeper***start*****************");
if (watchedEvent.getType() == Event.EventType.None && watchedEvent.getPath() == null) {
log.info("Project started, initialize zooKeeper node"+getServerPort());
try {
// Create a root node for zooKeeper information
String path = Constants.zoo_product_key_prefix;
if(zooKeeper ! =null && zooKeeper.exists(path, false) = =null) {
zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }}catch(KeeperException | InterruptedException e) { e.printStackTrace(); }}else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
try {
// Get the node path
String path = watchedEvent.getPath();
// Get node data
String soldOut = new String(zooKeeper.getData(path, true.new Stat()));
// Get the commodity Id
String productId = path.substring(path.lastIndexOf("/") + 1);
// Process the current server's JVM cache
if("false".equals(soldOut)){
log.info("Port."+getServerPort()+", ZooKeeper node:"+path+"Mark false [item not sold out]");
if(RedisZookeeperController.getParams().containsKey(Constants.local_product_key_prefix+productId)){ RedisZookeeperController.getParams().remove(Constants.local_product_key_prefix+productId); }}}catch(KeeperException | InterruptedException e) { e.printStackTrace(); }}}}Copy the code
Modify the second kill interface
private static ConcurrentHashMap<String,Boolean> params = new ConcurrentHashMap<>();
@PostMapping("/secKill/{productId}/{userId}")
public ResultRtn secKill(@PathVariable("productId") Long productId,
@PathVariable("userId") Long userId,
@RequestParam("isAndEx") String isAndEx
) throws KeeperException, InterruptedException {
/ / the Key of the JVM
String jvmKey = Constants.local_product_key_prefix+productId;
// Determine the JVM cache
if(params.containsKey(jvmKey)){
log.info("Local cache is out");
return ResultRtn.ok("This item is sold out.");
}
// Redis Key for inventory
String redisKey = Constants.redis_product_key_prefix+productId;
//Redis inventory minus 1
Long count = redisTemplate.opsForValue().decrement(redisKey);
try{
/** * [isAndEx] is just for testing the simulation of concurrent use, so that we can see the effect that ZooKeeper gives us * scenario is as follows: 9002 * A thread is applying 1, the value of isAndEx is 1, then thread A will be stuck here for 10 seconds, at this time the Redis inventory has been 0, 10 seconds countdown starts * A thread is still stuck 】 -B thread is applying 2, the value of isAndEx is 0, then thread B, query the Redis inventory is 0, Return that the item is sold out, and application 2's JVM cache [Params] has been set to true * A thread ends 10 seconds after application 1- emulation, throws an exception, and the inventory minus 1 is added back. The inventory becomes 1, * When the inventory is +1 due to the exception of application 1, if there is no ZooKeeper to delete the JVM cache [Params] of application 2, then all subsequent requests will enter machine B, and no goods will be seized. * At this point, there will be less selling phenomenon. * /
if("1".equals(isAndEx)){
Thread.sleep(10000);
throw new Exception("Simulation exception, delayed by 5 seconds.");
}
if(count<0){
redisTemplate.opsForValue().increment(redisKey);
params.put(Constants.local_product_key_prefix+productId,true);
// Set the "sold out" flag in zooKeeper. The zooKeeper data format is product/1 true
String productPath = Constants.zoo_product_key_prefix + "/" + productId;
if(zooKeeper.exists(productPath, true) = =null){
zooKeeper.create(productPath, "true".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
return ResultRtn.ok("This item is sold out.");
}
productService.updateStock(productId);
}catch (Exception e){
// Use ZooKeeper to roll back the sold out flag in other servers' JVM caches
String path = Constants.zoo_product_key_prefix + "/" + productId;
if (zooKeeper.exists(path, true) != null){
zooKeeper.setData(path, "false".getBytes(), -1);
}
params.remove(jvmKey);
// there is an exception Redis minus 1, add back
redisTemplate.opsForValue().increment(redisKey);
return ResultRtn.error("Network is congested. Please try again later.");
}
return ResultRtn.ok("Snap it up");
}
public static ConcurrentHashMap<String,Boolean> getParams(a){
return params;
}
Copy the code
Test procedure: To test, we start two applications, Application 1 (machine A) port 9001 and Application 2 (machine B) port 9002. To test for concurrency problems, we added a parameter to the seckill interface to pause the 9001 thread for 10 seconds. So the 9002 doesn’t stall. So we call 9001 first, let the inventory decrease by 1 and the thread sleep for 10 seconds. So when we visit 9002, it should be that the goods are sold out. If 9002 is repeatedly called before the 9001 interface returns a result, the console should print that the local cache is sold out. The interface returns that the item is sold out, indicating that the 9002 JVM cache has been set to True. When the 9001 interface throws an exception 10 seconds later, the inventory of Redis should be added back, so I need to notify 9002 and tell 9002 that I failed to grab this product and the inventory is added back. Your cache cannot be sold out. Otherwise, if all subsequent requests reach 9002, all of them will not succeed, but actually there is inventory. At this point, we are calling the 9002 seconds kill interface, should be a snap success. That’s what Zookeeper is all about.
Start two applications using Idea, copy the original one, and set the port to 9002
Set the database item inventory to 1
Call the interface to synchronize the database inventory to Redis
Call 9001 seckill interface, isAndEx pass 1, Redis inventory reduction, will be delayed for 10 seconds, give enough time for other threads to seckill
http://127.0.0.1:9001/redisZookeeper/secKill/1337041728065032193/1?isAndEx=1
Copy the code
When the 9001 interface card is down, the 9002 interface is invoked, and isAndEx is transmitted to 0
http://127.0.0.1:9001/redisZookeeper/secKill/1337041728065032193/1?isAndEx=0
Copy the code
The interface returns:
{" code ": 0," MSG ":" the goods has been sold out ", "data" : "the goods has been sold out"}Copy the code
After the 9001 interface returns the result, the 9002 interface is invoked and isAndEx is passed to 0
http://127.0.0.1:9001/redisZookeeper/secKill/1337041728065032193/1?isAndEx=0
Copy the code
The interface returns:
{
"code": 0,
"msg": "抢购成功",
"data": "抢购成功"
}
Copy the code
Source code published: address: gitee.com/stevenlisw/…