“This is the 17th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”

Hello, I’m looking at the mountains.

On some systems, some task status is tracked and the task needs to be re-executed if it fails. This article focuses on a solution to this request, which has not been implemented in code for time reasons. But after deducting with a friend, it is the more effective plan that can think of at present. Given my lack of knowledge, if one of the gods has a better solution, please let me know, thank you very much.

1. Problem description

1.1 One main task, multiple sub-tasks

In the current system environment, there are usually multiple instances of an application, which is split horizontally to improve concurrency. Normally, an instance receives a request and begins processing the request. If the request is to command the current instance to rebuild the index for all items in a category, say there are 10,000 items in that category, that instance will have a significant amount of resources invested in rebuilding the index over the next period of time. But other instances are idle, creating a situation where one person is working and everyone is watching.

Assuming the task ends normally, this approach is not too problematic. However, there may be an extreme case where the instance succeeds in reindexing 9999 items, and the instance fails when the last item is rebuilt. Then the current task is failed, and the previous 9999 items are created in vain.

1.2 Task status Tracking

In a messaging platform, after a received message fails to be sent to the destination address, several attempts must be made after a period of time to ensure that the message is reachable. If sending a message fails after several retries, the message status is set to failed, waiting for human intervention.

Let’s say the messaging platform is unreliable, or the target service is unreliable, and over time, retry tasks add up to 3000. The 3000 tasks that need to be retried are distributed unevenly over each period of time. The message identifier is not the serial number and cannot be counted through the sequence number segment. In this case, even if multiple instances can retry the messages at the same time, they can simply be retried by time group in order to avoid omissions and duplication. This will result in uneven task allocation and will not give full play to the problem solving capability of the cluster.

2

In fact, the above two cases can be thought of as one, namely a bunch of stateless tasks that need to be executed. For efficient resource utilization, multiple applications should not execute a task at the same time, and the task does not need to be executed again after it succeeds.

The most straightforward and simplest idea is to provide a system that can store tasks:

  1. Obtains the task list from the storage system periodically or in listening mode
  2. Check whether the task is locked. If the task is locked, abort the task. If not, lock the task
  3. Start a mission
  4. After the task is executed, write the task result to the storage system and unlock the task
  5. Repeat 1. If the task is successfully executed, skip the task or archive the task

3. Solutions

3.1 the polling

According to the above solution, scheduled polling is the simplest and most direct solution.

polling

As shown above:

  1. JOB Task Periodically obtains the task list from 1
  2. Loop through the tasks in the task list
  3. Write the task results back to the database

However, this approach can be optimized in many ways, such as:

  • If there are multiple instances, each instance fetches a portion of the task list when the task is started, which is called paging fetches the task list. This requires that the task list be effectively paginated and that tasks are evenly spread across each page of the task list. For example, the list is taken by time, and the task list is evenly distributed across the timeline.
  • There is no need for two instances to execute the same task at the same time
  • There must be states during task execution. If the instance that holds the task dies, another instance can execute the task again

This is the way I inherited the code, but the guy didn’t paginate the task list. Under normal circumstances, the task list is very short, less than 100 items, and the task list fetching cycle is 5 minutes, running without any problems. But once the task set is entered and all tasks are fetched each time, you can imagine an instance entering 3000 tasks at one point and then starting to execute them one by one, the task execution time is extended indefinitely. To take advantage of clustering’s ability to work together, the code was modified in the following polling + listening style.

3.2 Polling + Listening

The polling + listening approach also has its drawbacks.

polling

As you can see from the image above, it’s pretty obvious that this is an upgrade to 3.1 (although it’s an upgrade, it still doesn’t work very well).

  1. JOB Task periodically obtains the task list from data
  2. Loop through the task list to eliminate tasks that do not meet the requirements
  3. Write the tasks that meet the requirements to ZooKeeper and create task nodes under taskPath.
  4. The Listener listens to taskPath byte events, detects a task node creation event, reads node data from ZooKeeper, and starts the task execution
  5. After the task is executed, write the task status back to the database

This mode improves task execution efficiency. As long as the JOB timing rules are set properly, tasks are randomly assigned to each listening instance and executed. The shortfalls in this scheme are timing polling and ZooKeeper stress:

  • Scheduled polling: The JOB polling task is not discarded because the time is limited. Therefore, only the distributed lock of ZooKeeper can be used. An instance in the cluster reads the task list and writes the task to ZooKeeper. This is acceptable if there are no subsequent problems.
  • Zookeeper service pressure: The monitoring of ZooKeeper nodes needs to create long connections and often requests zooKeeper method status confirmation. Therefore, if there are many task nodes and they stay for a long time, the pressure on the ZooKeeper server is high. If the server can withstand the pressure, it can ensure that any changes to the task node can be sensed in quasi-real time, and respond quickly to the changes in the task.

3.3 Task Queue

Analyze the shortcomings of the previous two schemes and add the previous experience. The solution is already there: The fastest way to do a long list of tasks is to batch them in groups, which means paging out the list of tasks and then using multiple threads to batch them. (As for how much to fetch each time, how many threads to execute can only be calculated according to different task difficulty, task period) :

  • Paging: The difficulty of paging is that the paging should be uniform and clearly marked so that another instance does not repeatedly fetch paged data. The simplest data structure is a FIFO queue, which can be read sequentially from the queue. Since it is a clustered environment, you only need this queue to be able to read data exclusively (delete, hide, or via displacement control).
  • Batch execution: The simplest way to batch execution is to execute tasks in parallel through multiple threads, which is not difficult.

The execution process is as follows:

polling

  1. Producer writes task data to the database for backup or recording task status
  2. Producer writes task data to the task queue
  3. The consumer paged the task list from the task queue and executed it in batches. Determine whether to return to the task queue and wait for execution based on the execution status
  4. After a successful task is executed, the task status is added to the database
  5. If a task fails to be executed, it is written back to the task queue and waits to be read and executed again

Consider an exception: if a consumer of an instance reads the task list, the instance dies after the task queue removes the read task list. In this scenario, the tasks in the instance will be lost, and the following dual task queue approach can solve this problem.

3.4 Dual-task Queue

Consider the example of a worker on a production line taking a set of parts from a conveyor belt for inspection. Check unqualified to put back to the end of the production line, waiting for the machine to reprocess parts; Check and pack. The conveyor belt is the task queue; Employees are consumers; When an employee picks up a set of parts, there are no parts on the conveyor belt, that is, the task is exclusively acquired; Parts qualified packing, that is, the task is successful; If the parts are unqualified, put them back on the conveyor belt, that is, the task fails. Similar to the above scheme.

Suppose an employee finishes picking up parts and checking half of them, boxes them, calls them back, and then suddenly gives up and leaves. At this point, the worktable is littered with unexamined parts. If a man patrols the workstations and finds unattended workstations with scattered parts, he simply puts the parts on the workstations back on the conveyor belt and they can be inspected properly again.

Applying the above example to our solution, it is a dual task queue model, as shown in the figure below:

polling

  1. Producer writes task data to the database for backup or recording task status
  2. Producer writes task data to the task queue
  3. The consumer paged the list of tasks from the task queue
  4. The consumer writes the list of tasks to the second queue to prevent task loss
  5. After a successful task is executed, the task status is added to the database
  6. If a task fails to be executed, it is written back to the task queue and waits to be read and executed again
  7. Scheduled tasks Check the second task queue to find no primary task
  8. The scheduled task writes back to the producer the no-primary task that is retrieved from the second task queue

Consider this: if the exclusive read mode of the task queue is read and delete, then the consumer’s instance dies after reading the data but before writing to the second queue, the task will still be lost. Therefore, it is safe to exclude the task queue by masking or displacement.

  • Shielding means that if a consumer reads task data, the task data status will be changed, and other consumers can no longer see the data. After consumer’s confirmation, the data can be deleted or archived.
  • The displacement is to record the current reading position through a displacement and set the lock. Other consumers wait for the current processing task. After the processing is finished, the displacement is submitted and other consumers can read the data.

4 Task queue selection

4.1 the RabbitMQ

In RabbitMQ, channel. basicConsume is used to listen for tasks in the queue. For security, set the second parameter, autoAck, to false. If a consumer reads a message and its state is Unacked, the message cannot be seen by other consumers until channel. basicAck is called. If the message is not acknowledged by ACK, the current consumer dies and the message is reset to Ready for consumption by other consumers. In this way, as mentioned above, tasks can be executed in disorder.

In order to drain as much resources as possible from each instance of the cluster, each instance can enable multiple threads to listen to the queue at the same time, that is, multiple consumers per instance, so that messages can be queued as quickly as possible. The following is a simple example of creating a connection to the RabbitMQ cluster, sending data to the RabbitMQ service via producer, and consuming messages via a Consumer subscription.

Create a connection:

ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setUsername("username");
factory.setPassword("password");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
factory.setConnectionTimeout(60);
Address[] addressArray = new Address[]{new Address("127.0.0.1".5672)};
ExecutorService es = Executors.newFixedThreadPool(200.new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("rabbitMQ-thread-" + thread.getId());
        returnthread; }}); Connection conn = factory.newConnection(es, addressArray);Copy the code

A simple producer:

Channel channel = conn.createChannel();
channel.basicPublish("someExChange"."someQueue".true, MessageProperties.PERSISTENT_TEXT_PLAIN, "Hello, world!".getBytes());
Copy the code

In each thread, a consumer can have the following example code:

final Thread currentThread = Thread.currentThread();
try {
    final Channel channel = conn.createChannel();
    channel.basicConsume("someQueue".false."someConsumerTag".new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "UTF-8");
                    logger.info("threadName={}, routingKey={}, contentType={}, deliveryTag={}, message={}",
                            currentThread.getName(), routingKey, contentType, deliveryTag, message);
                    // Task processing begins
                    // ...
                    // Task processing is complete
                    channel.basicAck(deliveryTag, false); }}); }catch (IOException e) {
    logger.error("Error occurred", e);
}
Copy the code

4.2 Kafka

Kafka is designed to store logs sequentially, and by doing so, it can be used for ordered queues, which can be used for ordered tasks. Define a Topic with 20 partitions, and in each instance of the cluster, start 5 threads to read as consumers. (To make efficient use of resources, the number of partitions should be greater than or equal to the number of consumer threads, so that some threads are not idle and waste resources).

In order to guarantee a certain instance dies, other instances can continue on an instance of unfinished tasks, to the end of the message processing in each task, called ConsumerConnector.com mitOffsets (true) to modify the offset. This is the way we’re moving.

There is a variable use in Kafka, which can be either ordered or unordered:

  • Order: When writing data to Kafka using producer, kafka sets a key. Kafka hashes the key to the partition. If the key is fixed, the partition is fixed. The consumer read is relatively fixed (relative because the consumer does load balancing at intervals, so it may switch the consumer). In this way, tasks are executed in an orderly fashion. The downside is that only one instance of the cluster can get the right to read the data, while the others are waiting. Only when this instance dies does another instance gain the right to continue what the last instance left undone.
  • Disorderly: When writing data using producer, a variable value can be added to the key to evenly distribute the data among different partitions, so that consumers of different instances can read the data.

Example of producer code (the example code is ordered mode, and the out-of-order mode only needs to modify the Job-key according to the actual situation) :

import static org.apache.kafka.clients.producer.ProducerConfig.*;

Properties properties = new Properties();
properties.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ACKS_CONFIG, "all");// 0, 1, all
properties.put(BUFFER_MEMORY_CONFIG, "33554432");
properties.put(COMPRESSION_TYPE_CONFIG, "none");// none, gzip, snappy
properties.put(RETRIES_CONFIG, "0");
properties.put(BATCH_SIZE_CONFIG, "16384");
properties.put(CLIENT_ID_CONFIG, "someClientId");
properties.put(LINGER_MS_CONFIG, "0");
properties.put(MAX_REQUEST_SIZE_CONFIG, "1048576");
properties.put(RECEIVE_BUFFER_CONFIG, "32768");
properties.put(SEND_BUFFER_CONFIG, "131072");
properties.put(TIMEOUT_CONFIG, "30000");
properties.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

ProducerRecord<String, String> topic = new ProducerRecord<>("mq-job-topic"."job-key"."{id:1}");
kafkaProducer.send(topic, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            logger.info("topic={}, partition={}, offset={}", metadata.topic(), metadata.partition(), metadata.offset());
        } else {
            logger.error("The producer failed to send a message", exception); }}}); kafkaProducer.close();Copy the code

Example consumer code:

Properties properties = new Properties();
properties.put("zookeeper.connect"."127.0.0.1:2181 / kafka");
properties.put("fetch.message.max.bytes"."1048576");
properties.put("group.id"."someGroupId");
properties.put("auto.commit.enable"."false");
properties.put("auto.offset.reset"."largest");// smallest, largest

final ConsumerConnector connector = new KafkaConsumerFactory(properties).build();
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("mq-job-topic".10);
Map<String, List<KafkaStream<byte[].byte[]>>> messageStreams = connector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[].byte[]>> kafkaStreams = messageStreams.get("mq-job-topic");
ExecutorService executorService = Executors.newFixedThreadPool(10.new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        returnt; }});for (final KafkaStream<byte[].byte[]> kafkaStream : kafkaStreams) {
    executorService.submit(new Runnable() {
        @Override
        public void run(a) {
            for (MessageAndMetadata<byte[].byte[]> messageAndMetadata : kafkaStream) {
                try {
                    String key = new String(messageAndMetadata.key(), "UTF-8");
                    String message = new String(messageAndMetadata.message(), "UTF-8");
                    logger.info("message={}, key={}", message, key);
                    // Task processing begins
                    // ...
                    // Task processing is complete
                    connector.commitOffsets(true);
                } catch (Exception e) {
                    logger.error("Abnormal", e); }}}},null);
}
Copy the code

5 at the end

Although this solution was not actually used in the project, it was demonstrated in the demo. In addition, distributed queues can choose from RabbitMQ (task ordering) or Kafka (task ordering, Kafka) depending on your requirements, but there are many other options.

Recommended reading

  • What are microservices?
  • Microservices programming paradigm
  • Infrastructure for microservices
  • Feasible solutions for service registration and discovery in microservices
  • From singleton architecture to microservice architecture
  • How to effectively use Git to manage code in microservices teams?
  • Summary of data consistency in microservice systems
  • Implementing DevOps in three steps
  • System Design Series how to Design a Short Chain Service
  • System design series of task queues
  • Software Architecture – Caching technology
  • Software Architecture – Event-driven architecture

Hello, I’m looking at the mountains. Swim in the code, play to enjoy life. If this article is helpful to you, please like, bookmark, follow. Welcome to follow the public account “Mountain Hut”, discover a different world.