preface
In daily task opening, there are many asynchronous, batch, timed, and delayed tasks to be processed. Go-queue is recommended to be used in Go-Zero. Go-queue itself is also developed based on Go-Zero and has two modes:
dq
: depends on thebeanstalkd
, suitable for delayed and scheduled task execution;kq
: depends on thekafka
, suitable for asynchronous and batch task execution;
This article will start with the DQ and slowly explore the logic behind the Go-Queue.
Introduction of dq
Dq encapsulates the underlying BeanstalKD operation, distributed storage, delay and timing Settings. Restarting the service can be re-executed, but the messages are not lost because the processing of the messages is left to BeanstalKD.
As you can see, it is very simple to use, and the use of Redis setNX in DQ ensures that each message is consumed only once. However, redis is not used for message storage on the producer side, as described earlier.
Having given a brief introduction to the overall architecture of DQ, the following is a formal exploration :hammer:
Producers example
func main(a) {
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",}})for i := 1000; i < 1005; i++ {
// Delay: Delay execution
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
// At: executes At a certain point
//_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5))
iferr ! =nil {
fmt.Println(err)
}
}
}
Copy the code
In terms of use, it can be divided into two steps:
NewProducer(opts)
: passes the local queue’s port configuration and topic configuration to the producer;producer.Delay()
: Use the newly created oneproducers, to call itDelay()
. To pass in messages that need to be sent asynchronously,Delay
You also need to pass in the delay execution time.
The producer created is an interface, and Delay() is a method of the interface. Other methods and interior design will follow. Then we continue to explore it
Dive into the producer execution process
Let’s look at the entire call chain from example.
Initialize the
dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ... })// Initialize the producer
|- NewProducerNode(endpoint, tube) // Endpoint,tube from the configuration array passed in
Copy the code
Then producerNode.go, which involves initialization of Beanstalk:
NewProducerNode(endpoint, tube)
|- conn: newConnection(endpoint, tube)
|- return &connection{}
Copy the code
This involves beanstalk: connection.conn -> *beanstalk.
However, beanstalk.conn is not initialized in newConnection(), which is a lazy initialization
Delay
Delay(data, timesecond) is used to insert the message into the internal queue. So what does Delay() do?
p.Delay(data, timesecond)
|- p.wrap(data, time) // Wrap data and time together
|- p.insert(nodeFn)
|- node.Delay() // for rangre p.ode Execute 'Delay()' for each node
Copy the code
The data encapsulated in the previous step is passed to each node of P {cluster} to execute node.delay.
As mentioned in the previous initialization, the conn was not initialized in the first place, so we cannot insert data without initializing the conn.
node.Delay() // Execute 'Delay()' for each node in the configuration
|- node.conn.get() // Get conn from node
|- _, err := conn.Put(data, deplay, opts...)
|- node.conn.reset() // In the case of err, such as OOM/Timeout -> Disable CONN to prevent leakage
Copy the code
Put(data, Delay) :
tube.Put(data, delay)
|- tube.Conn.cmd("put",...).// The producer publishes the job
Copy the code
Beanstalk puts the producer Put command.
put <pri> <delay> <ttr> <bytes> <data>
Copy the code
<pri>
: priority. A smaller value indicates a higher priority. The default value is 1024.<delay>
Delay:ready
The number of seconds during which job isdelayed
State;<ttr>
:time to run
, the maximum number of seconds that the worker is allowed to execute. If the worker cannot delete, release, or bury the job during this period, the server will release the job automatically when the job times out.<bytes>
:job body
The length does not contain\r\n
;<data>
: Job body data;
OK. So what happens if I insert the job?
INSERTED <id>\r\n
Copy the code
The returned ID is the id of the job. Put analysis done here, follow the code:
tube.Put(data, priority, daley, ttr)
|- tube.Conn.cmd("put", ...)
|- tube.Conn.readResp("INSERTED id") | -return id, err // Return the id
Copy the code
This completes the producer operations that we can see directly in Example. Above, it’s easier to talk about:
producer interface
In addition to the Delay() used in example, there are several other methods:
Producer interface {
At(body []byte, at time.Time) (string, error)
Close() error
Delay(body []byte, delay time.Duration) (string, error)
Revoke(ids string) error
}
Copy the code
At
: Specifies a time for executionDelay()
】Close
: Closes all node connectionsDelay
: Delay execution. The time of the incoming delay.Revoke
: Essentially, when the minimum write node is less than 2, the adding fails and the job is deleted.
Of course, in fact, to use DQ, developers only need to use At/Delay. You just need to know if your task is timed or delayed. The rest, dQ internal packaging has been done for you.
Framework to address
Github.com/tal-tech/go…
Go-zero’s streaming library FX is also used extensively in Go-Queue.
Github.com/tal-tech/go…
Welcome to go-Queue and star support us! Build go-Zero Ecology together! 👍