instructions
Think-queue is an extension to TP5.1. x that requires installation to use
Following is the basic usage, and this article follows a database-driven model
Install the think-queue extension
Note: The latest version of Think-Queue requires tp6.x support, so the installation version of this article is 2.0.4
Composer require topthink/think - queue 2.0.4
Create a table
CREATE TABLE `prefix_jobs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`queue` varchar(255) NOT NULL,
`payload` longtext NOT NULL,
`attempts` tinyint(3) unsigned NOT NULL,
`reserved` tinyint(3) unsigned NOT NULL,
`reserved_at` int(10) unsigned DEFAULT NULL,
`available_at` int(10) unsigned NOT NULL,
`created_at` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code
3. Modify the configuration file and add the corresponding driver configuration
File location: root directory /config/queue.php
Return ['connector' => 'Database', // Database driver 'expire' => 60, // Task expire time, default is 60 seconds; To disable, set to NULL 'default' => 'default', // Default queue name 'table' => 'jobs', // table name for storing messages without prefix 'DSN' => [],]Copy the code
Expire parameter description in the configuration file The EXPIRE parameter refers to the expire time of the task, in seconds. + expire > When expire is not null, thinkphp-queue checks and reissues expired tasks before fetching the next task. When expire is null, thinkphp-queue does not check expired tasks, which is relatively high performance. Note, however, that these timed out tasks remain in the message queue and need to be handled by the developer (either deleted or resent)!
4. Message creation and push
1. Create a new controller: \application\ Index \controller\ jobtest.php 2. Add a new method to the controller: test() 3
<? php namespace app\admin\controller; use think\Queue; use think\Controller; class JobTest extends Controller { public function test() { // 1. Which class will handle the current task? // When it is the task's turn, the system will generate an instance of the class and call its fire method $jobHandlerClassName = 'app\ Jobs \JobTest'; $jobQueueName = "helloJobQueue"; $jobQueueName = "helloJobQueue"; // 3. The service data required by the current task cannot be of the resource type. Other types are eventually converted to json strings. Store the key values of the public property to) $jobData = [' ts' = > time (), 'bizId' = > uniqid (), 'a' = > 1); $isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName); / / database driver, the return value is 1 | false; Redis drive, the return value for the random string | false if ($isPushed! == false ){ echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>"; }else{ echo 'Oops, something went wrong.'; }}}Copy the code
V. Consumption and deletion of messages
1. Add a new consumer class: \ Application \ Jobs \ jobtest.php
<? php namespace app\jobs; use think\queue\Job; use app\common\model\JobsTest as JobsTestModel; Class JobTest {/ * * * fire method is the default message queue the method called * @ param Job $Job current task object * @ param array | mixed $data release task * / public custom data Function fire(Job $Job, $data) { Determine in advance whether to need to perform $isJobStillNeedToBeDone = $this - > checkDatabaseToSeeIfJobNeedToBeDone ($data); if(! $isJobStillNeedToBeDone){ $job->delete(); return; $isJobDone = $this->doHelloJob($data); $job->delete(); if ($isJobDone) {$job->delete(); } else {if ($job->attempts() > 3) {$job->delete(); $job->release(2); $delay = $delay; }}} /** * Some messages may not need to be executed by the time they reach consumers * @param $data User-defined data when publishing tasks * @return bool Result of task execution */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * Actual business processing based on the data in the message... * @param $data * @return bool */ private function doHelloJob($data) { $model = new JobsTestModel(); $inData = [' uniqId '= > $data [' uniqId],' time '= > $data [' ts'],' content '= >' queue insert data of success]. $res = $model->save($inData); if (! $res) { return false; } return true; }}Copy the code
At this point, our code is complete
Debug/test
1. Create table jobs_test
CREATE TABLE `st_jobs_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uniqId` varchar(255) DEFAULT NULL,
`time` varchar(255) DEFAULT NULL,
`content` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
Copy the code
2. Publish tasks: Perform the message creation method http://xxx/admin/job_test/test
If the operation is successful, you can see the page return display: “2020-08-07 10:53:00 A new Hello Job is Pushed to the MQ”
id | queue | payload | attempts | reserved | reserved_at | available_at | created_at |
---|---|---|---|---|---|---|---|
6 | helloJobQueue | {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769329,”uniqId”:”5f2cc4317c6a3″}} | 0 | 0 | 1596769329 | 1596769329 | |
7 | helloJobQueue | {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769332,”uniqId”:”5f2cc4349c42d”}} | 0 | 0 | 1596769332 | 1596769332 | |
8 | helloJobQueue | {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769334,”uniqId”:”5f2cc4367f8f1″}} | 0 | 0 | 1596769334 | 1596769334 | |
9 | helloJobQueue | {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769335,”uniqId”:”5f2cc437cfcaf”}} | 0 | 0 | 1596769335 | 1596769335 | |
10 | helloJobQueue | {“job”:”app\jobs\JobTest”,”data”:{“ts”:1596769337,”uniqId”:”5f2cc439086dd”}} | 0 | 0 | 1596769337 | 1596769337 |
PHP think queue:work –queue helloJobQueue PHP think queue:work –queue helloJobQueue App \jobs\JobTest” At this point, we check that a message in the database table JOBS has been consumed, and a new consumed data is added to jobs_test
id | uniqId | time | content |
---|---|---|---|
6 | 5f2cc4317c6a3 | 1596769329 | The queue successfully inserted data. Procedure |
At this point, we have successfully gone through the basic process of creating -> pushing -> consuming -> deleting a message
“Appendix”
Use of tp5.1.x queue documentation github.com/coolseven/n…