Pick up where we left off: We talked about modifying gateway & RPC in the App Module generated by Go-Zero. This article explains how to access asynchronous tasks and use logs.

Delay Job

In daily task opening, there are many asynchronous, batch, timed, and delayed tasks to be processed. Go-queue is recommended to be used in Go-Zero. Go-queue itself is also developed based on Go-Zero and has two modes:

  • dq : depends on the beanstalkd, distributed, storable, delay, timing Settings, shutdown restart can be re-executed, messages will be lost, the use of very simple, Go-queue using Redis setnx to ensure that each message is only consumed once, the use scenario is mainly used to do daily tasks
  • kq: depends on thekafkaI don’t have to tell you much about this onekafka, the usage scenario is mainly used for logging

We mainly talk about DQ, kQ is also used, but it depends on the different bottom layer. If you have not used BeanstalKD before, you can Google it first, it is quite easy to use.

I created a message-job. API service under Jobs using goctl

Info (title: // message task desc: // Message task author: "Mikael" email: "[email protected]" ) type BatchSendMessageReq {} type BatchSendMessageResp {} service message-job-api { @handler BatchSendMessageHandler // Sending SMS messages in batches post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)}Copy the code

Because there is no need to use routing, I deleted routes.go under handler, and created a new jobrun. go under handler, with the following content:

package handler

import (
	"fishtwo/lib/xgo"
	"fishtwo/app/jobs/message/internal/svc"
)


/** * @description Start job * @author Mikael * @date 2021/1/18 12:05 * @version 1.0 **/

func JobRun(serverCtx *svc.ServiceContext)  {
	xgo.Go(func(a) {
		batchSendMessageHandler(serverCtx)
    / /... many job})}Copy the code

In fact, xgo.Go is Go batchSendMessageHandler(serverCtx), encapsulate Go Ctrip, prevent wild Goroutine panic

Then modify the startup file message-job.go

package main

import (
   "flag"
   "fmt"

   "fishtwo/app/jobs/message/internal/config"
   "fishtwo/app/jobs/message/internal/handler"
   "fishtwo/app/jobs/message/internal/svc"

   "github.com/tal-tech/go-zero/core/conf"
   "github.com/tal-tech/go-zero/rest"
)

var configFile = flag.String("f"."etc/message-job-api.yaml"."the config file")

func main(a) {
   flag.Parse()

   var c config.Config
   conf.MustLoad(*configFile, &c)

   ctx := svc.NewServiceContext(c)
   server := rest.MustNewServer(c.RestConf)
   defer server.Stop()

   handler.JobRun(ctx)

   fmt.Printf("Starting server at %s:%d... \n", c.Host, c.Port)
   server.Start()
}
Copy the code

Handler.RegisterHandlers(server, CTX) changed to handler.JobRun(CTX)

Next, we can introduce dQ by adding dqConf to /etc/xxx. yaml

. DqConf: beancellulose: -endpoint: 127.0.0.1:7771 Tube: tube1 -endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: nodeCopy the code

Here I simulate two nodes, 7771 and 7772, using different ports locally

In the internal/config/config. Go add configuration parser object

type Config struct{... DqConf dq.DqConf }Copy the code

Modify the handler/batchsendmessagehandler. Go

package handler

import (
	"context"
	"fishtwo/app/jobs/message/internal/logic"
	"fishtwo/app/jobs/message/internal/svc"
	"github.com/tal-tech/go-zero/core/logx"
)

func batchSendMessageHandler(ctx *svc.ServiceContext){
	rootCxt:= context.Background()
	l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
	err := l.BatchSendMessage()
	iferr ! =nil{
		logx.WithContext(rootCxt).Error("[job-err] : % v",err)
	}
}
Copy the code

Modify batchsendMessagelogic. Go to write our consumer logic

package logic

import (
   "context"
   "fishtwo/app/jobs/message/internal/svc"
   "fmt"
   "github.com/tal-tech/go-zero/core/logx"
)

type BatchSendMessageLogic struct {
   logx.Logger
   ctx    context.Context
   svcCtx *svc.ServiceContext
}

func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
   return BatchSendMessageLogic{
   	Logger: logx.WithContext(ctx),
   	ctx:    ctx,
   	svcCtx: svcCtx,
   }
}


func (l *BatchSendMessageLogic) BatchSendMessage(a) error {
   fmt.Println("job BatchSendMessage start")

   l.svcCtx.Consumer.Consume(func(body []byte) {
   	fmt.Printf("job BatchSendMessage %s \n" + string(body))
   })

   fmt.Printf("job BatchSendMessage finish \n")
   return nil
}
Copy the code

Now that you’re done, start message-job.go and let’s go

go run message-job.go
Copy the code

Then we can add tasks to the DQ in the business code, and it can be consumed automatically

5 Delay tasks are sent to dQ:

	producer := dq.NewProducer([]dq.Beanstalk{
		{
			Endpoint: "localhost:7771",
			Tube:     "tube1",
		},
		{
			Endpoint: "localhost:7772",
			Tube:     "tube2",}})for i := 1000; i < 1005; i++ {
		_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
		iferr ! =nil {
			fmt.Println(err)
		}
	}
Copy the code

Producer. At can specify a certain time to execute, very useful, interested friends can study their own.

The error log

Httpresult. go: httpresult.go: httpresult.go: Httpresult. go

Let’s see what happens in RPC. Okay

Yes, I added a GRPC interceptor to the main of each RPC startup www.yuque.com/tal-tech/go…

Then I use github/ PKG /errors package in the code to handle errors, this package is very useful

So:

Errorf(” [rpc-srV-err] %+v”, ERR); Errorf(” [rpc-srv-err] %+v”, ERR);

Error(” [gateway-srv-err] : %+v “, ERR); Error(” [gateway-srv-err] : %+v “, ERR)

Go zero prints a log, and using logx.WithContext will bring in the trace-ID, such that a request comes down, for example

user-api --> user-srv --> message-srv
Copy the code

If messsage-srv fails and the three of them have the same trace-ID, is it possible to search the stack information of this request error by entering this trace-ID in ELK? You can also access Jaeger, Zipkin, Skywalking, etc., which I haven’t yet.

Framework to address

Github.com/tal-tech/go…

Welcome to Go-Zero and star support us! 👍