1. Why write this library
  2. What are the application scenarios
  3. How to use
  4. conclusion

Why write this library?

Before starting our own work on go-Queue, we investigated current open source queue solutions for the following:

beanstalkd

Beanstalkd has some useful features: priority, delay, time-to-run and buried, and it can support distributed background tasks and scheduled task processing well. Here are the basics of Beanstalkd:

  • job: Task unit;
  • tube: Task queue, storage of the same typejob. Producer and consumer operate objects;
  • producer:jobProducer, adding job to a tube by putting;
  • consumer:jobConsumers, through the reserve/release/bury/delete to get the job or change the state of the job;

Fortunately, the official go Client is available: github.com/beanstalkd/…

But there is a learning curve for Go developers unfamiliar with The Beanstalkd operation.

kafka

Similar to kafka message queue as the storage scheme, the storage unit is the message, if you want to achieve delayed execution, you can think of the scheme is delayed execution time as topic, so in a large message system, There are a large number of one-time topics (DQ_1616324404788, DQ_1616324417622). When time is scattered, random disk write is easy to occur.

And in the GO ecology,

Also consider the following factors:

  • Support for delayed tasks
  • High availability ensures data loss
  • Extensible resources and performance

So we developed our own Go-Queue based on these two basic components:

  1. Based on thebeanstalkddevelopeddqSupports timing and delay operations. At the same time to joinredisEnsure consumption uniqueness.
  2. Based on thekafkadevelopedkqSimplifies producer and consumer development apis while using bulk write in Kafka to save IO.

The overall design is as follows:

Application scenarios

First, in the consumption scenario, one is for the task queue and one is for the message queue. And the biggest difference:

  • Tasks are not sequentially constrained; Message needs;
  • A task may have status updates (or cancellations) while it is being added or waiting. Messages are a single store;

Therefore, the selection of infrastructure behind is also based on this consumption scenario.

  • dq: depends on the beanstalkd, suitable for delayed and scheduled task execution;
  • kq: depends on thekafka, suitable for asynchronous and batch task execution;

It can also be seen from the DQ API:

// Delay task execution
- dq.Delay(msg, delayTime);

// Scheduled task execution
- dq.At(msg, atTime);
Copy the code

And within us:

  • If it isAsynchronous message consumption/push, will select usekq:kq.Push(msg);
  • If it is15-minute reminder/ Text message tomorrow at noonAnd so ondq;

How to use

Dq and KQ are introduced respectively:

dq

// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
	{
		Endpoint: "localhost:11300",
		Tube:     "tube",
	},
	{
		Endpoint: "localhost:11301",
		Tube:     "tube",}})for i := 1000; i < 1005; i++ {
	_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
	iferr ! =nil {
		fmt.Println(err)
	}
}
Copy the code
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{
  Beanstalks: []dq.Beanstalk{
    {
      Endpoint: "localhost:11300",
      Tube:     "tube",
    },
    {
      Endpoint: "localhost:11301",
      Tube:     "tube",
    },
  },
  Redis: redis.RedisConf{
    Host: "localhost:6379",
    Type: redis.NodeType,
  },
})
consumer.Consume(func(body []byte) {
  // your consume logic
  fmt.Println(string(body))
})
Copy the code

Similar to the normal producer-consumer model, developers only need to focus on the following:

  1. Developers only need to focus on their own task type “delay/timing”
  2. Consumption logic at the consumer end

kq

Producer. The go:

// message structure
type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}

pusher := kq.NewPusher([]string{
	"127.0.0.1:19092"."127.0.0.1:19093"."127.0.0.1:19094",},"kq")

ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
	select {
	case <-ticker.C:
		count := rand.Intn(100)
    // Prepare the message
		m := message{
			Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
			Value:   fmt.Sprintf("%d,%d", round, count),
			Payload: fmt.Sprintf("%d,%d", round, count),
		}
		body, err := json.Marshal(m)
		iferr ! =nil {
			log.Fatal(err)
		}

		fmt.Println(string(body))
    // push to kafka broker
		if err := pusher.Push(string(body)); err ! =nil {
			log.Fatal(err)
		}
	}
}
Copy the code

The config. Yaml:

Name: kq
Brokers:
  - 127.0. 01.: 19092
  - 127.0. 01.: 19092
  - 127.0. 01.: 19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
Copy the code

Consumer. Go:

var c kq.KqConf
conf.MustLoad("config.yaml", &c)

// WithHandle: Specifies the logic for processing MSG
// This is something that developers need to customize for their own business
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
  fmt.Printf("=> %s\n", v)
  return nil
}))
defer q.Stop()
q.Start()
Copy the code

Unlike DQ, the developer doesn’t have to worry about task types (there’s no concept of task here, just Message Data).

Other operations are similar to DQ, except that the business processing function is passed directly to the consumer as a configuration.

conclusion

In our current scenario, KQ is heavily used in our asynchronous messaging service; For delayed tasks, in addition to DQ, we can also use the in-memory version of TimingWheel “Go-Zero ecological component”.

Stay tuned for more design and implementation articles on Go-Queue. Welcome to pay attention to and use.

Github.com/tal-tech/go…

Github.com/tal-tech/go…

Welcome to Go-Zero and star support us!