Pick up where we left off: We talked about modifying gateway & RPC in the App Module generated by Go-Zero. This article explains how to access asynchronous tasks and use logs.
Delay Job
In daily task opening, there are many asynchronous, batch, timed, and delayed tasks to be processed. Go-queue is recommended to be used in Go-Zero. Go-queue itself is also developed based on Go-Zero and has two modes:
dq
: depends on thebeanstalkd
, distributed, storable, delay, timing Settings, shutdown restart can be re-executed, messages will be lost, the use of very simple, Go-queue using Redis setnx to ensure that each message is only consumed once, the use scenario is mainly used to do daily taskskq
: depends on thekafka
I don’t have to tell you much about this onekafka
, the usage scenario is mainly used for logging
We mainly talk about DQ, kQ is also used, but it depends on the different bottom layer. If you have not used BeanstalKD before, you can Google it first, it is quite easy to use.
I created a message-job. API service under Jobs using goctl
Info (title: // message task desc: // Message task author: "Mikael" email: "[email protected]" ) type BatchSendMessageReq {} type BatchSendMessageResp {} service message-job-api { @handler BatchSendMessageHandler // Sending SMS messages in batches post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)}Copy the code
Because there is no need to use routing, I deleted routes.go under handler, and created a new jobrun. go under handler, with the following content:
package handler
import (
"fishtwo/lib/xgo"
"fishtwo/app/jobs/message/internal/svc"
)
/** * @description Start job * @author Mikael * @date 2021/1/18 12:05 * @version 1.0 **/
func JobRun(serverCtx *svc.ServiceContext) {
xgo.Go(func(a) {
batchSendMessageHandler(serverCtx)
/ /... many job})}Copy the code
In fact, xgo.Go is Go batchSendMessageHandler(serverCtx), encapsulate Go Ctrip, prevent wild Goroutine panic
Then modify the startup file message-job.go
package main
import (
"flag"
"fmt"
"fishtwo/app/jobs/message/internal/config"
"fishtwo/app/jobs/message/internal/handler"
"fishtwo/app/jobs/message/internal/svc"
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/rest"
)
var configFile = flag.String("f"."etc/message-job-api.yaml"."the config file")
func main(a) {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
handler.JobRun(ctx)
fmt.Printf("Starting server at %s:%d... \n", c.Host, c.Port)
server.Start()
}
Copy the code
Handler.RegisterHandlers(server, CTX) changed to handler.JobRun(CTX)
Next, we can introduce dQ by adding dqConf to /etc/xxx. yaml
. DqConf: beancellulose: -endpoint: 127.0.0.1:7771 Tube: tube1 -endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: nodeCopy the code
Here I simulate two nodes, 7771 and 7772, using different ports locally
In the internal/config/config. Go add configuration parser object
type Config struct{... DqConf dq.DqConf }Copy the code
Modify the handler/batchsendmessagehandler. Go
package handler
import (
"context"
"fishtwo/app/jobs/message/internal/logic"
"fishtwo/app/jobs/message/internal/svc"
"github.com/tal-tech/go-zero/core/logx"
)
func batchSendMessageHandler(ctx *svc.ServiceContext){
rootCxt:= context.Background()
l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
err := l.BatchSendMessage()
iferr ! =nil{
logx.WithContext(rootCxt).Error("[job-err] : % v",err)
}
}
Copy the code
Modify batchsendMessagelogic. Go to write our consumer logic
package logic
import (
"context"
"fishtwo/app/jobs/message/internal/svc"
"fmt"
"github.com/tal-tech/go-zero/core/logx"
)
type BatchSendMessageLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
return BatchSendMessageLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *BatchSendMessageLogic) BatchSendMessage(a) error {
fmt.Println("job BatchSendMessage start")
l.svcCtx.Consumer.Consume(func(body []byte) {
fmt.Printf("job BatchSendMessage %s \n" + string(body))
})
fmt.Printf("job BatchSendMessage finish \n")
return nil
}
Copy the code
Now that you’re done, start message-job.go and let’s go
go run message-job.go
Copy the code
Then we can add tasks to the DQ in the business code, and it can be consumed automatically
5 Delay tasks are sent to dQ:
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:7771",
Tube: "tube1",
},
{
Endpoint: "localhost:7772",
Tube: "tube2",}})for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
iferr ! =nil {
fmt.Println(err)
}
}
Copy the code
Producer. At can specify a certain time to execute, very useful, interested friends can study their own.
The error log
Httpresult. go: httpresult.go: httpresult.go: Httpresult. go
Let’s see what happens in RPC. Okay
Yes, I added a GRPC interceptor to the main of each RPC startup www.yuque.com/tal-tech/go…
Then I use github/ PKG /errors package in the code to handle errors, this package is very useful
So:
Errorf(” [rpc-srV-err] %+v”, ERR); Errorf(” [rpc-srv-err] %+v”, ERR);
Error(” [gateway-srv-err] : %+v “, ERR); Error(” [gateway-srv-err] : %+v “, ERR)
Go zero prints a log, and using logx.WithContext will bring in the trace-ID, such that a request comes down, for example
user-api --> user-srv --> message-srv
Copy the code
If messsage-srv fails and the three of them have the same trace-ID, is it possible to search the stack information of this request error by entering this trace-ID in ELK? You can also access Jaeger, Zipkin, Skywalking, etc., which I haven’t yet.
Framework to address
Github.com/tal-tech/go…
Welcome to Go-Zero and star support us! 👍