Share ideas is important, but with the train of thought, but there was no implementation code, it will make people feel very flamboyant, working on the program in apes all know, you go to implement a function module, a piece of code, although you have the idea, but the implementation process is very time consuming, especially the code debugging, and various tests, etc. So IN my opinion, thought + code is the main core of a good blog post.
One, foreword
Double tenth shortly after, we all know that in Tmall, jingdong, su ning, and so on electricity network operators website have many seconds kill activities, such as at a certain moment for an original 1999 seconds to bargain now as long as 999 mobile phones, will usher in the peak period of a user request, there may be hundreds of thousands of millions of concurrency, to grab the phone, In the case of high concurrency, the database server or file server application server will be under great pressure, and it may break down in serious cases. Another problem is that the things that kill in seconds have a quantity. For example, a mobile phone has only 10 units of data kill, so in the case of high concurrency, Tens of thousands of data update database (for example, the amount of 10 is robbed one will be reduced by 1 in some records of the data set), at this time the order is very disorderly, it is easy to appear the amount of 10, grab more than 10 people this serious problem. So, how do we solve the problems we are talking about? The techniques I’m going to share can be used to address these issues: distributed locks and task queues.
Second, implementation ideas
1.Redis implements distributed lock thinking
The idea is very simple, the main redis function is setnx(), this should be the most important function to achieve distributed lock. Insert Lock:order into redis by setnx(); insert Lock:order into Redis by setnx Return true if yes, false if no. Of course, it’s a little more complicated than that in my code, which I’ll explain when I analyze it.
2.Redis implements the task queue
The implementation here will use the above Redis distributed locking mechanism, mainly using the ordered collection data structure in Redis. For example, the add() function of Zset is used to join the team, and the getScore() function of zset is used to join the team. You can also pop up several tasks at the top.
That’s a simple way to implement distributed locking and task queuing. If you’re a little confused, look at the code implementation below.
Iii. Code analysis
First, analyze the code implementation of Redis distributed lock
1) To avoid lock failure due to special reasons, the lock will be given a lifetime (by setting the parameters of the lock method or using the default value) after the lock is successfully locked. The lifetime of the lock will be automatically released after the lifetime is exceeded. The default lifetime of the lock is relatively short (seconds). The expire method can be used to extend the lifetime of a lock to an appropriate time, such as within a loop.
2) System-level lock When a process crashes for any reason, the operating system will retrieve the lock itself, so resources will not be lost, but distributed lock is not used. If the process is set for a long time, when the unlock is not invoked due to a process crash or other exceptions, The lock becomes garbage in the remaining time, and other processes or processes cannot enter the locked area after being restarted.
The first is timeout, which is the waiting time for acquiring the lock in a cycle. During this time, it will keep trying to acquire the lock until it times out. If it is 0, it will directly return after the lock fails to be acquired without waiting any more. Another important parameter is timeout, which is the waiting time for the lock acquisition cycle. During this time, it will keep trying to obtain the lock until the timeout. If it is 0, it will directly return after the lock acquisition failure and no longer wait. Another important parameter is timeout, which is the waiting time for the lock acquisition cycle. During this time, it will keep trying to obtain the lock until the timeout. If it is 0, it will directly return after the lock acquisition failure and no longer wait. Expire is another important parameter. This parameter specifies the maximum lifetime of the current lock, in seconds. This parameter must be greater than 0. The most important function of this parameter is explained in (1) above.
We get the current time, and then we get the timeout (which is a timestamp) when the lock failed, and then we get the maximum time the lock can live. The redis key is in this format: Insert data into redis, using the setnx() function, which means insert data if the key does not exist, store the maximum lifetime as a value, and set the expiration time of the key if the insert succeeds. Place the key in the lockedName array and return true. If the key exists, the insert operation will not be performed. There is a serious step here, which is to obtain the remaining time of the current key. If this time is less than 0, the key does not exist, because setNx will create it automatically. Expire will not be called if an instance of a process crashes after setnx succeeds. If the waiting time for lock failure is not set or the maximum waiting time has been exceeded, the loop exits. Otherwise, the lockedName array returns true. If the key exists, the insert operation will not be performed. There is a serious step here, which is to obtain the remaining time of the current key. If this time is less than 0, the key does not exist, because setNx will create it automatically. Expire will not be called if an instance of a process crashes after setnx succeeds. If the waiting time for lock failure is not set or the maximum waiting time has been exceeded, the loop exits. Otherwise, the lockedName array returns true. If the key exists, the insert operation will not be performed. There is a serious step here, which is to obtain the remaining time of the current key. If this time is less than 0, the key does not exist, because setNx will create it automatically. Expire will not be called if an instance of a process crashes after setnx succeeds. If the wait time for lock failure is not set or the maximum wait time has been exceeded, the loop is exited, otherwise the request continues after waitIntervalUs. This is the whole code analysis of locking.
/** * lock * @param [type] $name Lock identifier * @param INTEGER $timeout Specifies the timeout period for the lock to be acquired. * @param INTEGER $EXPIRE Specifies the maximum lifetime (in seconds) of the current lock. The value must be greater than 0. If the lifetime expires, the lock is not released. * @param INTEGER $waitIntervalUs Specifies the timeout interval (in microseconds) * @return [type] [description] */ public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { if ($name == null) return false; $now = time(); $timeoutAt = $now + $timeout; $expireAt = $now + $expire; $redisKey = "Lock:{$name}"; $result = $this->redisString->setnx($this-> rediskey, $expireAt); if ($result ! $this->redisString->expire($redisKey, $expireAt); $this->lockedNames[$name] = $expireAt; return true; $TTL = $this->redisString-> TTL ($redisKey); // If the TTL is less than 0, the key does not exist, because setnx creates it automatically. If ($TTL < 0) {$this->redisString->set($this-> redisKey); $expireAt); $this->lockedNames[$name] = $expireAt; return true; } / part request lock loop * * * * * * * * * * / / / if not set the lock wait time of failure or already exceeds the maximum waiting time, then exit the if ($timeout < = 0 | | $timeoutAt < microtime (true)) break; $waitIntervalUs ($waitIntervalUs); } return false; }Copy the code
Then look at the unlock code analysis: unlock is much simpler, pass in the parameter is the lock identifier, first determine whether there is a lock, if there is, from redis through the deleteKey() function to delete the lock identifier.
/** * unlock * @param [type] $name [description] * @return [type] [description] */ public function unlock($name) { If ($this->isLocking($name)) {if ($this->redisString->deleteKey("Lock:$name")) {$this->redisString->deleteKey("Lock:$name")) {$this->isLocking($name) unset($this->lockedNames[$name]); return true; } } return false; }Copy the code
In the paste to delete all the lock method, in fact, are the same, more than a loop traversal.
Return [type] [description] */ public function unlockAll() {$allSuccess = true; foreach ($this->lockedNames as $name => $expireAt) { if (false === $this->unlock($name)) { $allSuccess = false; } } return $allSuccess; }Copy the code
The above is to use Redis to achieve distributed lock the whole set of ideas and code implementation of the summary and share, here I attach is an implementation of the class code, code I basically annotated each line, convenient for everyone to quickly understand and can simulate the application. For a deeper look at the entire class code:
/** * class RedisLock {private $redisString; private $lockedNames = []; public function __construct($param = NULL) { $this->redisString = RedisFactory::get($param)->string; } /** ** lock * @param [type] $name Lock identifier * @param INTEGER $timeout specifies the timeout period in which the lock will be attempted until the timeout period. * @param INTEGER $EXPIRE Specifies the maximum lifetime (in seconds) of the current lock. The value must be greater than 0. If the lifetime expires, the lock is not released. * @param INTEGER $waitIntervalUs Specifies the timeout interval (in microseconds) * @return [type] [description] */ public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { if ($name == null) return false; $now = time(); $timeoutAt = $now + $timeout; $expireAt = $now + $expire; $redisKey = "Lock:{$name}"; $result = $this->redisString->setnx($this-> rediskey, $expireAt); if ($result ! $this->redisString->expire($redisKey, $expireAt); $this->lockedNames[$name] = $expireAt; return true; $TTL = $this->redisString-> TTL ($redisKey); // If the TTL is less than 0, the key does not exist, because setnx creates it automatically. If ($TTL < 0) {$this->redisString->set($this-> redisKey); $expireAt); $this->lockedNames[$name] = $expireAt; return true; } / part request lock loop * * * * * * * * * * / / / if not set the lock wait time of failure or already exceeds the maximum waiting time, then exit the if ($timeout < = 0 | | $timeoutAt < microtime (true)) break; $waitIntervalUs ($waitIntervalUs); } return false; } @param [type] $name [description] * @return [type] [description] */ public function unlock($name) { If ($this->isLocking($name)) {if ($this->redisString->deleteKey("Lock:$name")) {$this->redisString->deleteKey("Lock:$name")) {$this->isLocking($name) unset($this->lockedNames[$name]); return true; } } return false; } @return [type] [description] */ public function unlockAll() {$allSuccess = true; foreach ($this->lockedNames as $name => $expireAt) { if (false === $this->unlock($name)) { $allSuccess = false; } } return $allSuccess; } /** * adds the specified lifetime to the current set, Must be greater than 0 * @param [type] $name [description] * @return [type] [description] */ public function expire($name, $expire = Max ($expire, 1); $expire = Max ($expire, 1); $expire = Max ($expire, 1); If ($this->redisString->expire("Lock:$name", $expire)) {return true; } } return false; } @param [type] $name [description] * @return Boolean [description] */ public function $locking ($name) {if ($this->lockedNames[$name]) {$locking ($name) {if ($this->lockedNames[$name]) (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name"); } return false; }} Redis implements distributed lockingCopy the code
Two) Using Redis to achieve the task queue code analysis
1) Task queue, which is used to queue the operations that can be processed asynchronously in the business logic, and then queue out after processing in other threads
2) Distributed locks and other logic are used in queues to ensure the consistency of queue entry and queue exit
3) This queue is different from ordinary queues. The ID used to join the queue is used to distinguish repeated entries. There is only one record in the queue
First look at the team code analysis: First of all, of course, is to test the legality of parameters, and then use the above the contents of the locking mechanism, is to lock, team here I choose the current timestamp as a score, then there is the team, using the zset data structure of the add () method, after the completion of the team, to unlock the task, which finished the operation of a team.
Join a Task * @param [type] $name Queue name * @param [type] $ID Task ID (or its array) * @param INTEGER $timeout Queue timeout duration (s) * @param integer $afterInterval [description] * @return [type] [description] */ public function enqueue($name, $id, $timeout = 10 and $afterInterval = 0) {/ / legitimacy to detect the if (empty ($name) | | the empty ($id) | | $timeout < = 0) return false. // lock if (! $this->_redis->lock->lock("Queue:{$name}", $timeout)) { Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id"); return false; $score = microtime(true) + $afterInterval; Foreach ((array)$id as $item) {if (false === = $this->_redis->zset->getScore("Queue: name", $item)) { $this->_redis->zset->add("Queue:$name", $score, $item); }} / / unlock $this - > _redis - > lock - > unlock (" Queue: $name "); return true; }Copy the code
Next, let’s look at the code analysis for queueing: queueing a Task requires specifying its $ID and $id
Score, if
Score: If score matches the queue, the Task is removed from the queue. Otherwise, the Task is considered to have been added to the queue. The current operation is treated as failure. $score = $score (); $score = $score (); $score = $score (); $score = $score (); Delete the task ID using the delete() method in zset. This is the code analysis of the exit team.
$id = $score; $score = $score; $score = $score; $score = $score; * * @param [type] $name Queue name * @param [type] $id task id * @param [type] $score task score * @param INTEGER $timeout Specifies the timeout period (s) * @return [type] Specifies whether the Task succeeded. False indicates that the redis operation failed. It could also be $score and the queue value does not match (this means that the Task by other threads since access to the local team) * / public function to dequeue ($name, $id, $score, $timeout = 10) {/ / legitimacy to detect the if (empty ($name) | | the empty ($id) | | the empty ($score)) return false. // lock if (! $this->_redis->lock->lock("Queue:$name", $timeout)) { Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id"); return false; $this-> _queue ->zset->getScore("Queue: name", $id); $result = false; If ($serverScore == $score) {if ($id == $result) (float)$this->_redis->zset->delete("Queue:$name", $id); if ($result == false) { Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id"); }} / / unlock $this - > _redis - > lock - > unlock (" Queue: $name "); return $result; }Copy the code
Studied the course data structure’s friends should know, queue operations and pop up the top of a value method and so on, processing operations team out of the team here, and I also implements the queue at the top of a number of Task and gives the method of team, would like to know friends can look at the code, if see not too clear message, here I am no longer to analyze it.
* @param [type] $name Queue name * @param INTEGER $count Number of tasks * @param INTEGER $timeout Timeout period * @return [type] returned array [0 = > [' id '= >' score '= >], 1 = > [' id' = > 'score' = >], 2 = > [' id '= >, 'score'=> ]] */ public function pop($name, $count = 1, $timeout = 10) {/ / legitimacy to detect the if (empty ($name) | | $count < = 0) return []; // lock if (! $this->_redis->lock->lock("Queue:$name")) { Log::get('queue')->error("pop faild because of pop failure: name = $name, count = $count"); return false; $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); Foreach ($array as $id =>$score) {$result[] = ['id'=>$id, 'score'=>$score]; $this->_redis->zset->delete("Queue:$name", $id); } / / unlock $this - > _redis - > lock - > unlock (" Queue: $name "); return $count == 1 ? (empty($result) ? false : $result[0]) : $result; }Copy the code
The above is the summary and sharing of the whole set of ideas and code implementation of the task queue with Redis. Here I attach the code of an implementation class, and I basically annotated each line in the code, so that we can quickly understand and simulate the application. For a deeper look at the entire class code:
/** */ class RedisQueue {private $_redis; public function __construct($param = null) { $this->_redis = RedisFactory::get($param); } /** * join a Task * @param [type] $name Queue name * @param [type] $ID Task ID (or its array) * @param INTEGER $timeout Queue timeout time (s) * @param integer $afterInterval [description] * @return [type] [description] */ public function enqueue($name, $id, $timeout = 10 and $afterInterval = 0) {/ / legitimacy to detect the if (empty ($name) | | the empty ($id) | | $timeout < = 0) return false. // lock if (! $this->_redis->lock->lock("Queue:{$name}", $timeout)) { Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id"); return false; $score = microtime(true) + $afterInterval; Foreach ((array)$id as $item) {if (false === = $this->_redis->zset->getScore("Queue: name", $item)) { $this->_redis->zset->add("Queue:$name", $score, $item); }} / / unlock $this - > _redis - > lock - > unlock (" Queue: $name "); return true; $id = $score = $score = $score = $score = $score = $score = $score = $score; * * @param [type] $name Queue name * @param [type] $id task id * @param [type] $score task score * @param INTEGER $timeout Specifies the timeout period (s) * @return [type] Specifies whether the Task succeeded. False indicates that the redis operation failed. It could also be $score and the queue value does not match (this means that the Task by other threads since access to the local team) * / public function to dequeue ($name, $id, $score, $timeout = 10) {/ / legitimacy to detect the if (empty ($name) | | the empty ($id) | | the empty ($score)) return false. // lock if (! $this->_redis->lock->lock("Queue:$name", $timeout)) { Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id"); return false; $this-> _queue ->zset->getScore("Queue: name", $id); $result = false; If ($serverScore == $score) {if ($id == $result) (float)$this->_redis->zset->delete("Queue:$name", $id); if ($result == false) { Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id"); }} / / unlock $this - > _redis - > lock - > unlock (" Queue: $name "); return $result; } @param [type] $name Queue name * @param INTEGER $count Number of tasks * @param INTEGER $timeout timeout period * @ return [type] returned array [0 = > [' id '= >' score '= >], 1 = > [' id' = > 'score' = >], 2 = > [' id '= >, 'score'=> ]] */ public function pop($name, $count = 1, $timeout = 10) {/ / legitimacy to detect the if (empty ($name) | | $count < = 0) return []; // lock if (! $this->_redis->lock->lock("Queue:$name")) { Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count"); return false; $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); Foreach ($array as $id =>$score) {$result[] = ['id'=>$id, 'score'=>$score]; $this->_redis->zset->delete("Queue:$name", $id); } / / unlock $this - > _redis - > lock - > unlock (" Queue: $name "); return $count == 1 ? (empty($result) ? false : $result[0]) : $result; } @param [type] $name Queue name * @param INTEGER $count Number * @return [type] Return array [0=>['id'=>, 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] */ public function top($name, $count = 1) {/ / legitimacy to detect the if (empty ($name) | | $count < 1) return []; $result = []; $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); / / will be foreach Task stored in an array ($array as $id = > $score) {$result [] = [' id '= > $id,' score '= > $score]; } return $count == 1? (empty($result) ? false : $result[0]) : $result; }} Redis implements the task queueCopy the code
To this, the basic interpretation of the two big functions, for the task queue, you can write a shell script, let the server regularly run certain programs, implementation team to wait in the operation, I would not be in here to be combined with the practical application to be realized, good understanding of the realization of the function of the two big ideas can, because the code with PHP language to write, If you understand the implementation, you can use Java or.net or other languages to implement these two features. These two functions can be used in many scenarios, especially the second kill. Another one is snatching train tickets during the Spring Festival Travel rush. These two are the most vivid examples. Of course, there are many other applications, and I won’t list them all here.
Pay attention and don’t get lost
All right, everybody, that’s all for this article. All the people here are talented. As I said before, there are many technical points in PHP, because there are too many, it is really difficult to write, you will not read too much after writing, so I have compiled it into PDF and document, if necessary
Click on the code: PHP+ “platform”
As long as you can guarantee your salary to rise a step (constantly updated)
I hope the above content can help you. Many PHPer will encounter some problems and bottlenecks when they are advanced, and they have no sense of direction when writing too many business codes. I have sorted out some information, including but not limited to: Distributed architecture, high scalability, high performance, high concurrency, server performance tuning, TP6, Laravel, YII2, Redis, Swoole, Swoft, Kafka, Mysql optimization, shell scripting, Docker, microservices, Nginx, etc