It is a posture to implement asynchronous tasks through multi-machine sharding. It is realized based on GO language.

The preface

For asynchronous tasks, such as 10W orders, if PHP is used, a scheduled task will be configured, and then the scheduled task will be executed on the single machine. In the case of GO or JAVA, we also need to use policies to ensure that the task is performed only on a single machine, such as distributed locking. Some of you may ask, “Why can’t I just do the same task on multiple planes?” I just want to say, “You have a lot of nerve. When multiple planes are processing data at the same time, you will die very badly.”

So, is there a way to make tasks run on multiple machines at the same time and avoid the problem of multiple machines processing the same data at the same time? Here to introduce a multi – machine sharding way, is also the recent in the company to Get new skills.

Application scenarios

Recently, I am doing asynchronous task migration, which requires processing orders in DB, because the number of orders is very large, and the order of 10W is a normal state. If only one machine is used to process the orders, the execution efficiency is very low, so it needs to be processed by multiple machines concurrently.

For this way, in fact, there is another solution, is the single machine to perform a task, and then put the task in the message queue, add an interface, the data in the queue for consumption, and data processing, because the interface is the corresponding service cluster deployment, so quickly, but here in the design, need to consider the message repeated consumption, Multiple machines may process a single message at the same time, network exceptions lead to the message is not processed and other problems, specific solutions, welcome offline and I discuss ha.

Multimachine shard

What is multi-machine sharding? To put it more generally, you divide the data into N pieces and give them to each machine. For example, if we have 1000 pieces of data, we divide the data into 5 pieces by corresponding strategy, and each piece of data is 200 pieces. If we have 5 machines, each machine can process 200 pieces of data separately.

So how does that work?

To illustrate this, LET me give you a quick simulation:

  • DB contains 20 pieces of data. DB primary key IDS are 0, 1, 2, 3… 19.
  • There are 3 machines, each machine up a thread to run the task, a total of 3 threads;
  • The data needs to be divided into 10 pieces, with two pieces per piece, and then divided among the three threads.

Token access

If you divide the data into 10 points, you have 10 tokens (number=10), which are 0, 1, 2… 9. The processing logic is as follows:

  • Increase =redis.incr(key); increase= increase%number token=increase%number; increase=1 So the token = 1% 10 = 1;
  • Construct token tokenKey=key+token=key1, then add a distributed lock to tokenKey through Redis, if the lock is successful, return the token value;
  • If the lock fails, increase=redis.incr(key), increase=2, token=2%10=2, tokenKey=key2, then execute distributed lock, return successfully, fail, repeat as above.

The Redis Incr command increments the numeric value stored in the key by one. If the key does not exist, the value of the key is initialized to 0 before the INCR operation is performed.

Obtain DB data in fragments

After the thread of the machine gets the token, it can go to the shard to get the data, if the DB data structure is as follows, and there are only 20 pieces of data:

The order number orderId Trade name productName Distribution status
0 cable 0
1 The keyboard 0
2 display 0
. . .
19 The mouse 0

Here’s a look at the shard fetching process:

  • Select * from tableName wWHERE orderId % 10 = token and status =0 limit pageSize; (if pageSize=100), thread (orderId=0, orderId= 10), thread (orderId=0, orderId= 10);
  • After obtaining the fragmented data, you can start logical processing of the data. After the processing is complete, you need to set status to 1 to avoid scanning the data next time.

I leave you with a question, if you have a piece of data that keeps failing, and every time you get the data, you get the problem data first, then what is the strategy to get the data later?

Actual combat simulation

Distributed locks are needed here. The code for distributed locks has been described in the article “Redis implements distributed locks”. Let’s first look at the code for obtaining the release token:

const NO_INDEX      = 100

const REDIS_ALIAS_NAME  = "jointly"

const TASK_NAME   = "task_name"

const RANGE_DATA   = int64(10)

const PAGE_SIZE   = int64(2)



// Fragment tasks

type FragmentTask struct {

 RedisLock

}



// Get the token

func (f *FragmentTask) GetToken(processId int64) (tokenId int64, err error) {

 i := 0

 for {

  increase, err := redis.Incr(REDIS_ALIAS_NAME, TASK_NAME)

  iferr ! =nil {

   return 0, err

  }

  tokenId := increase % RANGE_DATA

  lockKey := TASK_NAME + string(tokenId)

  if f.GetDistributeLock(lockKey, 60) {

   fmt.Printf("Get lock key success, processId:%d, tokenId:%d\n", processId, tokenId)

   return tokenId, nil

  }

  fmt.Printf("Get lock key conflict, processId:%d, tokenId:%d\n", processId, tokenId)



  i++

  if int64(i) >= RANGE_DATA {

   fmt.Printf("Begin a new cycle.\n")

   return NO_INDEX, nil

  }

 }

}



// Release the token lock

func (f *FragmentTask) ReleaseToken(tokenId int64) bool {

 lockKey := TASK_NAME + string(tokenId)

 ret := f.DelDistributeLock(lockKey)

 if! ret {

  fmt.Printf("Release token failed, tokenId:%d\n", tokenId)

 }

 return ret

}

Copy the code

The process of token generation has been explained in detail in the previous section, but the important thing to note here is that we only go through the range data range ata time. When we go beyond the range, we exit, and there is actually a loop that reenters.

Let’s look at the logic for fetching shard data through tokens:

func ( *Order) QueryOrderList(rangeData, tokenId, pageSize int64) (data []OrderData, err error){

 o := orm.NewOrm()

 o.Using("default")

 num, err := o.Raw("SELECT * from ""tb_order where status = 0 and order_id % ? =? limit ?", rangeData, tokenId, pageSize).QueryRows(&data)

 iferr ! = nil {

  return nil, err

 }

 if num > 0 {

 }

 return data, nil

}

Copy the code

The following is the task processing flow of a single thread:

// Handle the task

func (f *FragmentTask) DoProcess(processId int64) error {



 order := &db.Order{}



 for {

  tokenId, err := f.GetToken(processId)

  iferr ! =nil {

   fmt.Printf("failed, exist! \n")

   return err

  }



  // All tokens are locked. After sleep, retry

  if tokenId == NO_INDEX {

   fmt.Printf("All token is conflict, sleep for a while.\n")

   time.Sleep(time.Second * 8)

   continue

  }



  orderList, err := order.QueryOrderList(RANGE_DATA, tokenId, PAGE_SIZE)

  iferr ! =nil {

   fmt.Printf("Query order list failed, tokenId:%d, err:%s\n", tokenId, err.Error())

   f.ReleaseToken(tokenId)

   continue

  }

  fmt.Printf("Begin to process, processId:%d, tokenId:%d, print orderList:%v\n", processId, tokenId, orderList)



  // Processing tasks, using sleep simulation

  time.Sleep(time.Second * 1)



  // Update the DB record status after processing the data

  for _, orderRecord := range orderList {

   orderRecord.Status = 1

   order.UpdateOrderStatus(&orderRecord)

  }



  f.ReleaseToken(tokenId)

 }

 return nil

}

Copy the code

This logic is not clear, it is an SQL query. Finally, multithreaded logic, we only run 3 threads, simulating 3 machines (assuming each machine has only one thread) :

// The test task is fragmented

func FragmentTest(fragmentTask *redis.FragmentTask) {

 // Start 3 threads (simulate 3 machines) to process the task

 for i := 0; i <= 2; i ++ {

  go fragmentTask.DoProcess()

 }

 // Avoid subthread exit, main thread sleep for a while

 time.Sleep(time.Second * 100)

}



func main(a) {

 redisLock := &redis.RedisLock{}

 order := &db.Order{}

 fragmentTask := &redis.FragmentTask{}



 // Initialize the resource

 redisLock.IntiRedis()

 order.InitDb()



 // The test task is fragmented

 FragmentTest(fragmentTask)

 return

}

Copy the code

Order_id = order_id; order_id = order_id; order_id = order_id;

mysql> select * from tb_order;

+----+----------+--------------+--------+

| id | order_id | product_name | status |

+----+----------+--------------+--------+

| | 1 | 1 | 0 | mouse

2 | 2 | 2 | 0 | | the mouse

| 3 | 3 | 3 | | 0 mouse

| | | 4 4 mouse 4 | | 0

| | | 0 5 5 | | the mouse

| 6 | | | | 0 6 mouse

| | 7 July 7 | 0 | | the mouse

| | 8 August 8 | 0 | | the mouse

| | 9 September 9 | 0 | | the mouse

| | 10 10 10 | 0 | | the mouse

| | 11 November 11 | 0 | | the mouse

| | 12 December 12 | 0 | | the mouse

| | 13 13 13 | 0 | | the mouse

| | 14 14 14 | 0 | | the mouse

15 15 15 | | | mouse | | 0

16 16 | | | mouse 16 | | 0

17 17 | | | mouse 17 | | 0

| | 18 18 18 | 0 | | the mouse

19 | 0 | | 19 | | the mouse

| | 20 20 20 | 0 | | the mouse

+----+----------+--------------+--------+

Copy the code

Look directly at the execution results:

Get lock key success, processId:0, tokenId:1

Get lock key success, processId:1, tokenId:2

Get lock key success, processId:2, tokenId:3

Begin to process, processId:0, tokenId:1, printOrderList :[{1 1 mouse 1 0} {11 11 mouse 11 0}] orderList:[{1 1 mouse 1 0} {11 11 mouse 11 0}]

Begin to process, processId:2, tokenId:3, printOrderList :[{3 3 mouse 3 0} {13 13 mouse 13 0}] orderList:[{3 3 mouse 3 0} {13 13 mouse 13 0}]

Begin to process, processId:1, tokenId:2, printOrderList :[{2 2 mouse 2 0} {12 12 mouse 12 0}] orderList:[{2 2 mouse 2 0} {12 12 mouse 12 0}]

Get lock key success, processId:0, tokenId:4

Begin to process, processId:0, tokenId:4, printOrderList :[{4 4 mouse 4 0} {14 14 mouse 14 0}] orderList:[{4 4 mouse 4 0} {14 14 mouse 14 0}]

Get lock key success, processId:1, tokenId:5

Begin to process, processId:1, tokenId:5, printOrderList :[{5 5 mouse 5 0} {15 15 mouse 15 0}] orderList:[{5 5 mouse 5 0} {15 15 mouse 15 0}]

Get lock key success, processId:2, tokenId:6

Begin to process, processId:2, tokenId:6, printOrderList :[{6 6 mouse 6 0} {16 16 mouse 16 0}] orderList:[{6 6 mouse 6 0} {16 16 mouse 16 0}]

Get lock key success, processId:0, tokenId:7

Begin to process, processId:0, tokenId:7, printOrderList :[{7 7 mouse 7 0} {17 17 mouse 17 0}] orderList:[{7 7 mouse 7 0} {17 17 mouse 17 0}]

Get lock key success, processId:1, tokenId:8

Begin to process, processId:1, tokenId:8, printOrderList :[{8 8 mouse 8 0} {18 18 mouse 18 0}]

Get lock key success, processId:2, tokenId:9

Begin to process, processId:2, tokenId:9, printOrderList :[{9 9 mouse 9 0} {19 19 mouse 19 0}]

Get lock key success, processId:0, tokenId:0

Begin to process, processId:0, tokenId:0, printOrderList :[{10 10 mouse 10 0} {20 20 mouse 20 0}] orderList:[{10 10 mouse 10 0} {20 20 mouse 20 0}]

Get lock key success, processId:1, tokenId:1

Begin to process, processId:1, tokenId:1, print orderList:[]

Get lock key success, processId:2, tokenId:2

Begin to process, processId:2, tokenId:2, print orderList:[]

Get lock key success, processId:0, tokenId:3

Get lock key success, processId:1, tokenId:4

Begin to process, processId:1, tokenId:4, print orderList:[]

Get lock key success, processId:2, tokenId:5

Begin to process, processId:0, tokenId:3, print orderList:[]

Begin to process, processId:2, tokenId:5, print orderList:[]

Copy the code

For example, a thread with tokenId=7 will get [{7 7 mouse 7 0} {17 17 mouse 17 0}] from DB. For example, a thread with tokenId=7 will get [{7 7 mouse 7 0} {17 17 mouse 17 0}]. The order_id of these two data is 7 and 17, respectively. Because our data is divided into 10 points, we take the modulus by 10, and the modulus value is equal to tokenId. Thread 0 gets the tokens 0, 1, 4, 7, thread 1 gets the tokens 2, 5, 8, thread 2 gets the tokens 3, 6, 9, and thread 2 gets the tokens 3, 6, 9, and thread 0 gets the tokens 0, 1, 4, 7. Let’s set PageSize to 1 and see what happens:

Get lock key success, processId:0, tokenId:9

Get lock key success, processId:1, tokenId:0

Get lock key success, processId:2, tokenId:1

Begin to process, processId:0, tokenId:9, printOrderList :[{9 9 mouse 9 0}]

Begin to process, processId:2, tokenId:1, printOrderList :[{1 1 mouse 1 0}]

Begin to process, processId:1, tokenId:0, printOrderList :[{10 10 mouse 10 0}]

Get lock key success, processId:0, tokenId:2

Begin to process, processId:0, tokenId:2, printOrderList :[{2 2 mouse 2 0}] orderList:[{2 2 mouse 2 0}]

Get lock key success, processId:1, tokenId:3

Begin to process, processId:1, tokenId:3, printOrderList :[{3 3 mouse 3 0}] orderList:[{3 3 mouse 3 0}]

Get lock key success, processId:2, tokenId:4

Begin to process, processId:2, tokenId:4, printOrderList :[{4 4 mouse 4 0}]

Get lock key success, processId:0, tokenId:5

Begin to process, processId:0, tokenId:5, printOrderList :[{5 5 mouse 5 0}]

Get lock key success, processId:1, tokenId:6

Begin to process, processId:1, tokenId:6, printOrderList :[{6 6 mouse 6 0}]

Get lock key success, processId:2, tokenId:7

Begin to process, processId:2, tokenId:7, printOrderList :[{7 7 mouse 7 0}]

Get lock key success, processId:0, tokenId:8

Get lock key success, processId:1, tokenId:9

Begin to process, processId:0, tokenId:8, printOrderList :[{8 8 mouse 8 0}]

Begin to process, processId:1, tokenId:9, printOrderList :[{19 mouse 19 0}]

Get lock key success, processId:2, tokenId:0

Begin to process, processId:2, tokenId:0, printOrderList :[{20 20 mouse 20 0}]

Get lock key success, processId:0, tokenId:1

Get lock key success, processId:1, tokenId:2

Begin to process, processId:0, tokenId:1, printOrderList :[{11 11 mouse 11 0}]

Begin to process, processId:1, tokenId:2, printOrderList :[{12 12 mouse 12 0}]

Get lock key success, processId:2, tokenId:3

Begin to process, processId:2, tokenId:3, printOrderList :[{13 13 mouse 13 0}]

Get lock key success, processId:0, tokenId:4

Get lock key success, processId:1, tokenId:5

Begin to process, processId:0, tokenId:4, printOrderList :[{14 14 mouse 14 0}]

Begin to process, processId:1, tokenId:5, printOrderList :[{15 15 mouse 15 0}]

Get lock key success, processId:2, tokenId:6

Begin to process, processId:2, tokenId:6, printOrderList :[{16 16 mouse 16 0}]

Get lock key success, processId:0, tokenId:7

Get lock key success, processId:1, tokenId:8

Begin to process, processId:0, tokenId:7, printOrderList :[{17 17 mouse 17 0}]

Begin to process, processId:1, tokenId:8, printOrderList :[{18 18 mouse 18 0}]

Get lock key success, processId:2, tokenId:9

Begin to process, processId:2, tokenId:9, print orderList:[]

Copy the code

See here, isn’t it interesting, you can also do it ~~

limitations

This multi-machine sharding does not work for all asynchronous tasks, but only for characteristic scenarios:

  • Generally, DB data is operated, because DB can be well compatible with this sharding method, the above example is a good illustration;
  • Every time a DB data is processed, DB is required to record the changed state of the data. For the method that does not record the completion state of data processing, for example, it needs to send Push to all users in the database table, but whether the Push is completed or not, DB does not record it, but only handles it in the way of traversal from front to back. This shard cannot be well supported at present. (This is ok if you have to, as long as you can design a scheme to slice the data into appropriate pieces so that all machines don’t miss the data);
  • The current solution only supports permanently moving tasks, that is, tasks that need to be executed in memory and cannot be paused, but you can also design tasks that can be paused, such as recording whether all the data for each token has been executed, and if so, suspending for a period of time before starting.

Existing pit

  • The problem is that when you acquire a distributed lock, you actually set a timeout period for the lock. If the timeout period is too long and the machine hangs up, the sharding will not execute for a long time until the lock automatically times out. If the timeout period of the lock is set too short, the data of the shard will not be completed, and the lock will be acquired by another thread, which will lead to two threads executing the same shard, contrary to our design. (Is there a perfect solution? In fact, there is. You can add me on wechat to discuss this problem.)

Afterword.

From the common way of limiting traffic, to Redis distributed lock, and finally, multiple machines to perform asynchronous tasks, finally write the piece of want to write, in fact, the external output process, but also their own growth process. Recently, I have been looking at message queue and design pattern. The theoretical part of message queue has been finished. After finishing for a month, I feel my head is too big. For the design patterns section, I’m going to focus on actual projects, and then I’m going to go over common design patterns.

Welcome to more like, more articles, please pay attention to the wechat public number “Lou Zai advanced road”, point to pay attention, don’t get lost ~~