- Why write this library
- What are the application scenarios
- How to use
- conclusion
Why write this library?
Before starting our own work on go-Queue, we investigated current open source queue solutions for the following:
beanstalkd
Beanstalkd has some useful features: priority, delay, time-to-run and buried, and it can support distributed background tasks and scheduled task processing well. Here are the basics of Beanstalkd:
job
: Task unit;tube
: Task queue, storage of the same typejob
. Producer and consumer operate objects;producer
:job
Producer, adding job to a tube by putting;consumer
:job
Consumers, through the reserve/release/bury/delete to get the job or change the state of the job;
Fortunately, the official go Client is available: github.com/beanstalkd/…
But there is a learning curve for Go developers unfamiliar with The Beanstalkd operation.
kafka
Similar to kafka message queue as the storage scheme, the storage unit is the message, if you want to achieve delayed execution, you can think of the scheme is delayed execution time as topic, so in a large message system, There are a large number of one-time topics (DQ_1616324404788, DQ_1616324417622). When time is scattered, random disk write is easy to occur.
And in the GO ecology,
Also consider the following factors:
- Support for delayed tasks
- High availability ensures data loss
- Extensible resources and performance
So we developed our own Go-Queue based on these two basic components:
- Based on the
beanstalkd
developeddq
Supports timing and delay operations. At the same time to joinredis
Ensure consumption uniqueness. - Based on the
kafka
developedkq
Simplifies producer and consumer development apis while using bulk write in Kafka to save IO.
The overall design is as follows:
Application scenarios
First, in the consumption scenario, one is for the task queue and one is for the message queue. And the biggest difference:
- Tasks are not sequentially constrained; Message needs;
- A task may have status updates (or cancellations) while it is being added or waiting. Messages are a single store;
Therefore, the selection of infrastructure behind is also based on this consumption scenario.
dq
: depends on thebeanstalkd
, suitable for delayed and scheduled task execution;kq
: depends on thekafka
, suitable for asynchronous and batch task execution;
It can also be seen from the DQ API:
// Delay task execution
- dq.Delay(msg, delayTime);
// Scheduled task execution
- dq.At(msg, atTime);
Copy the code
And within us:
- If it isAsynchronous message consumption/push, will select use
kq
:kq.Push(msg)
; - If it is15-minute reminder/ Text message tomorrow at noonAnd so on
dq
;
How to use
Dq and KQ are introduced respectively:
dq
// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",}})for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
iferr ! =nil {
fmt.Println(err)
}
}
Copy the code
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11301",
Tube: "tube",
},
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
},
})
consumer.Consume(func(body []byte) {
// your consume logic
fmt.Println(string(body))
})
Copy the code
Similar to the normal producer-consumer model, developers only need to focus on the following:
- Developers only need to focus on their own task type “delay/timing”
- Consumption logic at the consumer end
kq
Producer. The go:
// message structure
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
pusher := kq.NewPusher([]string{
"127.0.0.1:19092"."127.0.0.1:19093"."127.0.0.1:19094",},"kq")
ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
select {
case <-ticker.C:
count := rand.Intn(100)
// Prepare the message
m := message{
Key: strconv.FormatInt(time.Now().UnixNano(), 10),
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
iferr ! =nil {
log.Fatal(err)
}
fmt.Println(string(body))
// push to kafka broker
if err := pusher.Push(string(body)); err ! =nil {
log.Fatal(err)
}
}
}
Copy the code
The config. Yaml:
Name: kq
Brokers:
- 127.0. 01.: 19092
- 127.0. 01.: 19092
- 127.0. 01.: 19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
Copy the code
Consumer. Go:
var c kq.KqConf
conf.MustLoad("config.yaml", &c)
// WithHandle: Specifies the logic for processing MSG
// This is something that developers need to customize for their own business
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
fmt.Printf("=> %s\n", v)
return nil
}))
defer q.Stop()
q.Start()
Copy the code
Unlike DQ, the developer doesn’t have to worry about task types (there’s no concept of task here, just Message Data).
Other operations are similar to DQ, except that the business processing function is passed directly to the consumer as a configuration.
conclusion
In our current scenario, KQ is heavily used in our asynchronous messaging service; For delayed tasks, in addition to DQ, we can also use the in-memory version of TimingWheel “Go-Zero ecological component”.
Stay tuned for more design and implementation articles on Go-Queue. Welcome to pay attention to and use.
Github.com/tal-tech/go…
Github.com/tal-tech/go…
Welcome to Go-Zero and star support us!