preface

In daily task opening, there are many asynchronous, batch, timed, and delayed tasks to be processed. Go-queue is recommended to be used in Go-Zero. Go-queue itself is also developed based on Go-Zero and has two modes:

  • dq: depends on the beanstalkd, suitable for delayed and scheduled task execution;
  • kq: depends on thekafka, suitable for asynchronous and batch task execution;

This article will start with the DQ and slowly explore the logic behind the Go-Queue.

Introduction of dq

Dq encapsulates the underlying BeanstalKD operation, distributed storage, delay and timing Settings. Restarting the service can be re-executed, but the messages are not lost because the processing of the messages is left to BeanstalKD.

As you can see, it is very simple to use, and the use of Redis setNX in DQ ensures that each message is consumed only once. However, redis is not used for message storage on the producer side, as described earlier.

Having given a brief introduction to the overall architecture of DQ, the following is a formal exploration :hammer:

Producers example

func main(a) {
	producer := dq.NewProducer([]dq.Beanstalk{
		{
			Endpoint: "localhost:11300",
			Tube:     "tube",
		},
		{
			Endpoint: "localhost:11301",
			Tube:     "tube",}})for i := 1000; i < 1005; i++ {
    // Delay: Delay execution
		_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
    // At: executes At a certain point
		//_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
		iferr ! =nil {
			fmt.Println(err)
		}
	}
}
Copy the code

In terms of use, it can be divided into two steps:

  1. NewProducer(opts): passes the local queue’s port configuration and topic configuration to the producer;
  2. producer.Delay(): Use the newly created oneproducers, to call itDelay(). To pass in messages that need to be sent asynchronously,DelayYou also need to pass in the delay execution time.

The producer created is an interface, and Delay() is a method of the interface. Other methods and interior design will follow. Then we continue to explore it

Dive into the producer execution process

Let’s look at the entire call chain from example.

Initialize the

dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ... })// Initialize the producer
	|- NewProducerNode(endpoint, tube)								// Endpoint,tube from the configuration array passed in
Copy the code

Then producerNode.go, which involves initialization of Beanstalk:

NewProducerNode(endpoint, tube)
	|- conn: newConnection(endpoint, tube)
		|- return &connection{}	
Copy the code

This involves beanstalk: connection.conn -> *beanstalk.

However, beanstalk.conn is not initialized in newConnection(), which is a lazy initialization

Delay

Delay(data, timesecond) is used to insert the message into the internal queue. So what does Delay() do?

p.Delay(data, timesecond)
	|- p.wrap(data, time)			// Wrap data and time together
		|- p.insert(nodeFn)
			|- node.Delay() 			// for rangre p.ode Execute 'Delay()' for each node
Copy the code

The data encapsulated in the previous step is passed to each node of P {cluster} to execute node.delay.

As mentioned in the previous initialization, the conn was not initialized in the first place, so we cannot insert data without initializing the conn.

node.Delay()									// Execute 'Delay()' for each node in the configuration
	|- node.conn.get()					// Get conn from node
	|- _, err := conn.Put(data, deplay, opts...)
		|- node.conn.reset() 			// In the case of err, such as OOM/Timeout -> Disable CONN to prevent leakage
Copy the code

Put(data, Delay) :

tube.Put(data, delay)
	|- tube.Conn.cmd("put",...).// The producer publishes the job
Copy the code

Beanstalk puts the producer Put command.

put <pri> <delay> <ttr> <bytes> <data>
Copy the code
  • <pri>: priority. A smaller value indicates a higher priority. The default value is 1024.
  • <delay>Delay:readyThe number of seconds during which job isdelayedState;
  • <ttr>time to run, the maximum number of seconds that the worker is allowed to execute. If the worker cannot delete, release, or bury the job during this period, the server will release the job automatically when the job times out.
  • <bytes>job bodyThe length does not contain\r\n;
  • <data>: Job body data;

OK. So what happens if I insert the job?

INSERTED <id>\r\n
Copy the code

The returned ID is the id of the job. Put analysis done here, follow the code:

tube.Put(data, priority, daley, ttr)
	|- tube.Conn.cmd("put", ...)
	|- tube.Conn.readResp("INSERTED id") | -return id, err			// Return the id
Copy the code

This completes the producer operations that we can see directly in Example. Above, it’s easier to talk about:

producer interface

In addition to the Delay() used in example, there are several other methods:

Producer interface {
  At(body []byte, at time.Time) (string, error)
  Close() error
  Delay(body []byte, delay time.Duration) (string, error)
  Revoke(ids string) error
}
Copy the code
  • At: Specifies a time for executionDelay()
  • Close: Closes all node connections
  • Delay: Delay execution. The time of the incoming delay.
  • Revoke: Essentially, when the minimum write node is less than 2, the adding fails and the job is deleted.

Of course, in fact, to use DQ, developers only need to use At/Delay. You just need to know if your task is timed or delayed. The rest, dQ internal packaging has been done for you.

Framework to address

Github.com/tal-tech/go…

Go-zero’s streaming library FX is also used extensively in Go-Queue.

Github.com/tal-tech/go…

Welcome to go-Queue and star support us! Build go-Zero Ecology together! 👍