Abstract: Preface message queues usually have two scenarios, one is publisher subscription mode and the other is producer consumer mode. In the publisher subscription mode, a publisher produces a message and puts it into a queue, and multiple listening consumers receive the same message, that is, each consumer receives the same message. In the producer-consumer mode, a producer produces a message and puts it into a queue, and multiple consumers listen to the queue at the same time. The one who grabs the message first will take the message from the queue. Finally, each message will be owned by only one consumer.

Preface message queues typically have two scenarios, publisher subscription and producer consumer. In the publisher subscription mode, a publisher produces a message and puts it into a queue, and multiple listening consumers receive the same message, that is, each consumer receives the same message. In the producer-consumer mode, a producer produces a message and puts it into a queue, and multiple consumers listen to the queue at the same time. The one who grabs the message first will take the message from the queue. Finally, each message will be owned by only one consumer.

In the era of big data, the number of topics in the traditional producer-consumer queue model may change from a few to a large number of topics. For example, to achieve a crawler task scheduling system of the whole web, every large portal and SNS will become a topic. There will also be a large number of sub-pages to grab within the topic. In the implementation of such a task distribution scheduling system may encounter the following problems:

The huge number of topics means we can have huge queues. In crawler scenario, a kind of website corresponds to a task queue according to the type of web page, and different task queues have their own producers and consumers. There can be multiple producers and consumers, and during peak business periods, large concurrent access and a large volume of messages are generated. The total number of messages for crawler tasks may be the number of web page addresses in the whole network. Tasks may have priorities. In order to achieve priority scheduling of tasks with higher priorities, we may subqueue them under a topic. Message consumption cannot be lost. If it is a scheduling message for a task, our message loss is zero tolerance. In consumer mode, if the consumer fails to process or times out for any reason, the support message needs to be rescheduled. On the premise of ensuring that the message will be processed, we also need to avoid a small number of messages for various reasons, and affect the whole system throughput. Because message reads tend to be lightweight, message processing is resource-intensive. We don’t want processing resources to be idle because the message read area is piling up. The solution is based on the TableStore’s high concurrency across partitions, and the primary key auto-increment feature is a good fit for our queue feature. Support massive, different partition key under the use of their own autoincrement primary key, can be a good implementation of massive queue. Specifically, we provide the following schemes:

The following table needs to be designed:

Task message table message consumption checkpoint table Full message table Before introducing table design, let me explain some nouns.

For each task message, we assume that we already have a unique ID.

Task priority, we assume that the priority range is fixed and already known. If tasks have too many priorities, they can be layered, for example, priorities 1 to 100 are mapped to level 1. Here, if our task has no priority, we can make a simple bucket according to the task data magnitude, and then rotate to grab the tasks in each bucket.

We need to record two cursor shifts for each priority level of each topic. One is to grab the scan cursor, one is to complete the cursor. The definition of scan cursor refers to the maximum displacement position scanned under the current priority level of the current task. The completed displacement point indicates that under the current priority level of the transformation task, the largest completed displacement point is grasped, and the previous tasks have all completed grasping.

Table design

Task message table



Here, each subtask will be inserted into this table. The task may be captured by different crawlers to generate subtasks. At the same time, the access address and priority of the task have been fixed. We map according to a hierarchical algorithm. So the first three columns of the primary key have been determined, insert TableStore (table storage), ID will be automatically generated, used for subsequent consumer read tasks.

Message consumption checkpoint table This table is used for message consumption checkpoint. The following details checkpoint content in combination with schema.



There will be two columns in the attribute column of this table, one for capturing the scanning displacement point, and one for recording the completion displacement point. In this case, checkpoint records need to be conditionally updated, that is, we will update only when the original value is less than the value to be updated.

Full message table

We use the full message table to store our message ids and corresponding attributes. Whether a message task is processed repeatedly is also determined by this table.



In the information table of the whole network, there is a list of attributes to represent the status of task processing. When consumers get the task ID, they need to conditionally update the key corresponding to this table. If the corresponding row does not exist, they can directly insert it. If it already exists, you need to change the read state to the non-end state and update it when the version is read. Successful update means that the task of the current ID is preempted by the consumer. If the row does not exist, it indicates the first crawl. If the row does not end, it indicates that the previous task may have failed.

Below, we use crawler to crawl the whole web page as an example to see how to make message queue based on TableStore and finally achieve task distribution:



This figure shows our whole crawler framework, and the specific process of crawler is as follows

Different crawlers will perform crawler pulling task from the crawler task table of TableStore regularly according to their crawling progress. Here, we generally access TableStore with single thread GetRange, and we think that the task read area rate here will be much faster than the speed of grasping consumers. The task data from the TableStore read area enters the crawler memory queue, and then carries on the next round of task message read area. Wait until the current memory queue is full and wait for the next round to wake up to continue fetching. If there is special requirement, different priorities can be pulled concurrently. The default checkpoint for each priority of a task corresponds to the first line of the TableStore flag Inf_Min. GetRange pull to the current task each priority task (for example, we can set the priority from high to low, a maximum 200, grasp enough to perform a task preemption) of article 200 of the, the crawler will first according to the specific priority, and then according to the priority from high to low try to update the web page information table, to crawl task preemption, after the success of the grab, The task will be put into the crawler’s memory task queue for the crawler thread to use. When the preemption succeeds, we will also update the state in the crawler task table and the current time, indicating the latest update time of the task. The subsequent task status inspection thread will check whether the task has expired and needs to be reprocessed. Note that if a crawler thread compared to leg is stuck for a long time after the last round of task preemption, it will not be a problem to try to update this time. This low-probability leg may lead to repeated fetching, but does not affect data consistency. In addition, we can record the time of each step in memory. If we find that the time in memory of each step times out, we can also end the current task and further reduce the small probability of repeated fetching. When all round of the task of filling, we will get the biggest task according to the current table id + 1 (which is a third crawler task table primary key, also is on the primary key) to try the current task corresponding priority checkpoint table updates (here the update frequency can be freely decided according to the business), update principle is a new id to be greater than or equal to the current ids. If the update fails, another crawler has obtained the updated task. You need to read the checkpoint table again to obtain the latest checkpoint ID and continue to pull from the updated task. In addition to task grab thread, each crawler can have a less frequent task for task completion scan, which is used for the latest task completion cursor. The maximum value of getrange in the scan is the start position of the current pull. The scan logic is as follows:

At this point, the cursor can be directly moved down and scanned to the task or initial state. A task has not been set as RUNNING by anyone, but has been pulled because this task is a repeated fetching task. At this point, you can check whether the URL exists in the URL table, and skip directly if it does. If the running task is detected and does not timeout, the task is still being executed. If checking the timestamp times out, check the URL table, if the content already exists, it may be that updating the status back to the task table failed, and the cursor continues to move down. If the content also does not exist, it is easy to put a new task directly in the corresponding priority of the table. The only problem is that concurrent checking may result in duplicate tasks (duplicate tasks can also be solved by URL de-duplication). Another approach is to update the URL table through the same task, the successful update can create a task down coordinates. Everyone else stops scanning and updates their checkpoint to their current location. The successful update can proceed down to the end of the scan or to the point where the task is normal, and then update checkpoint. After each crawler crawler task is completed, the state in the url table of the whole network and the state in the corresponding crawler task table will be updated. The state of the URL of the whole network is used for the re-use of the subsequent crawler task, and the state in the crawler task table is used by the completion cursor scanning thread of Step 5 above to judge whether a task has been completed. The entire write subtask and read can be abstracted from the following diagram



New tasks will be concurrently written to different queues according to their priorities. The number in the figure corresponds to the self-increasing column in the table storage. If users design the table structure according to the above, they do not need to deal with the number of concurrent writing. The cursor of crawler reading task is red in the figure, and blue corresponds to the completed task list. The two cursors are maintained independently under the response priority.

For example, if a crawler task pull thread is set to pull two tasks at a time,

Our crawler task table will switch from above to the following, where the task1 priority=3 scan cursor is updated to 10011 and the priority=2 scan cursor is updated to 10006. This means that priority 3 scans will start from 10011 and priority 2 scans will start from 10006.

Concurrent processing multi-crawler pull task has repetition, in this part, we determined that the same webpage will not be captured at the same time through conditional update of large table. Multi-crawler conditional update checkpoint table determines that the entire pull task will not miss the current batch of pulled tasks. If checkpoint update fails, the task will continue to be retried (e.g. service unavailable for a short time, leg, etc.). In this case, it is only possible to cause other crawlers to wake up and pull repeated data, but the crawler will not pull repeatedly because of preemption failure. In addition, the newly woken client will also update larger cursors to ensure that the system will not lag behind the task scanning cursor because of a client leg. Task decision completion logic we can do distributed mutex, and only one process at a time. You can also conditionally update the original table when judging the failure of the task, and then insert a new task after the successful update. Finally, we can see whether the key problems in the whole design are satisfied

Massive topic, TableStore the natural ability to use a partition key as a queue makes it very easy to implement massive queues on the order of hundreds of millions or more. Priority. The priority corresponds to a primary key column. The columns with higher priorities are obtained by getrange. System throughput, the design of two cursors in the whole system, so that our task scan cursor will quickly go down after each round of scanning, long tail task will not hinder the scanning of new tasks. On the other hand, our task will preempt the URL large table to avoid unnecessary repeated fetching. Subtasks are not lost, and autoincrement columns ensure that new tasks are placed at the end of the current queue with a larger value. There is also a completion scan thread that ensures that new tasks are completed before they are updated. This cursor represents whether the entire task is completed. This cursor also ensures that tasks are not lost. This task creates a new task for the long tail task and inserts it into the queue. The new task is retriggered by the new crawler, and the problem of starvation due to a stuck client is avoided.