sequence
This paper mainly studies the defaultProducer of RocketMQ-Client-Go
defaultProducer
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
typedefaultProducer struct { group string client internal.RMQClient state int32 options producerOptions publishInfo sync.Map callbackCh chan interface{} interceptor primitive.Interceptor }Copy the code
- DefaultProducer defines group, Client, state, Options, publishInfo, callbackCh, and Interceptor
NewDefaultProducer
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func NewDefaultProducer(opts ... Option) (*defaultProducer, error) { defaultOpts := defaultProducerOptions()for _, apply := range opts {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
iferr ! = nil {return nil, errors.Wrap(err, "new Namesrv failed.")}if! defaultOpts.Credentials.IsEmpty() {
srvs.SetCredentials(defaultOpts.Credentials)
}
defaultOpts.Namesrv = srvs
producer := &defaultProducer{
group: defaultOpts.GroupName,
callbackCh: make(chan interface{}),
options: defaultOpts,
}
producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)
return producer, nil
}
Copy the code
- The NewDefaultProducer method creates NameServerAddrs using internal.NewNamesrv, then instantiates defaultProducer, Then instantiated internal. GetOrNewRocketMQClient and primitive. ChainInterceptors
Start
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) Start() error {
atomic.StoreInt32(&p.state, int32(internal.StateRunning))
if len(p.options.NameServerAddrs) == 0 {
p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)
}
p.client.RegisterProducer(p.group, p)
p.client.Start()
return nil
}
Copy the code
- The enforcement of the Start method p.c lient. RegisterProducer and p.c lient. Start ()
Shutdown
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) Shutdown() error {
atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
p.client.UnregisterProducer(p.group)
p.client.Shutdown()
return nil
}
Copy the code
- Shutdown method performs p.c lient. UnregisterProducer and p.c lient. The Shutdown ()
SendSync
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) SendSync(ctx context.Context, msgs ... *primitive.Message) (*primitive.SendResult, error) {iferr := p.checkMsg(msgs...) ; err ! = nil {return nil, err
}
msg := p.encodeBatch(msgs...)
resp := new(primitive.SendResult)
ifp.interceptor ! = nil { primitive.WithMethod(ctx, primitive.SendSync) producerCtx := &primitive.ProducerCtx{ ProducerGroup: p.group, CommunicationMode: primitive.SendSync, BornHost: utils.LocalIP, Message: *msg, SendResult: resp, } ctx = primitive.WithProducerCtx(ctx, producerCtx) err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error { var err error realReq := req.(*primitive.Message) realReply := reply.(*primitive.SendResult) err = p.sendSync(ctx, realReq, realReply)return err
})
return resp, err
}
err := p.sendSync(ctx, msg, resp)
return resp, err
}
Copy the code
- The SendSync method first validates the message through P.heckmsg, then encodebatch, then executes p.interceptor for non-null p.interceptor, and finally executes p.sendsync (CTX, MSG, resp).
sendSync
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {
retryTime := 1 + p.options.RetryTimes
var (
err error
)
ifp.options.Namespace ! ="" {
msg.Topic = p.options.Namespace + "%" + msg.Topic
}
var producerCtx *primitive.ProducerCtx
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
continue
}
addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return fmt.Errorf("topic=%s route info not found", mq.Topic)
}
ifp.interceptor ! = nil { producerCtx = primitive.GetProducerCtx(ctx) producerCtx.BrokerAddr = addr producerCtx.MQ = *mq } res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)if_err ! = nil { err = _errcontinue
}
return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
}
return err
}
Copy the code
- , will retry sendSync retryCount is by p. electMessageQueue select mq (MSG), and then through the p.o ptions. Namesrv. FindBrokerAddrByName find addr, InvokeSync(CTX, ADDR, p.buildsendrequest
SendAsync
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, *primitive.SendResult, error), msgs ... *primitive.Message) error {iferr := p.checkMsg(msgs...) ; err ! = nil {return err
}
msg := p.encodeBatch(msgs...)
ifp.interceptor ! = nil { primitive.WithMethod(ctx, primitive.SendAsync)return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
return p.sendAsync(ctx, msg, f)
})
}
return p.sendAsync(ctx, msg, f)
}
Copy the code
- The SendAsync method basically executes p.sendAsync(CTX, MSG, f)
sendAsync
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
ifp.options.Namespace ! ="" {
msg.Topic = p.options.Namespace + "%" + msg.Topic
}
mq := p.selectMessageQueue(msg)
if mq == nil {
return errors.Errorf("the topic=%s route info not found", msg.Topic)
}
addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return errors.Errorf("topic=%s route info not found", mq.Topic)
}
ctx, _ = context.WithTimeout(ctx, 3*time.Second)
return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
resp := new(primitive.SendResult)
iferr ! = nil { h(ctx, nil, err) }else {
p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
h(ctx, resp, nil)
}
})
}
Copy the code
- SendAsync mainly executes p.client.invokeasync
SendOneWay
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ... *primitive.Message) error {iferr := p.checkMsg(msgs...) ; err ! = nil {return err
}
msg := p.encodeBatch(msgs...)
ifp.interceptor ! = nil { primitive.WithMethod(ctx, primitive.SendOneway)return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
return p.SendOneWay(ctx, msg)
})
}
return p.sendOneWay(ctx, msg)
}
Copy the code
- SendOneWay: p.sendoneway (CTX, MSG)
sendOneWay
Rocketmq – the client – go – v2.0.0 / producer/producer. Go
func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
retryTime := 1 + p.options.RetryTimes
ifp.options.Namespace ! ="" {
msg.Topic = p.options.Namespace + "%" + msg.Topic
}
var err error
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
continue
}
addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return fmt.Errorf("topic=%s route info not found", mq.Topic)
}
_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
if_err ! = nil { err = _errcontinue
}
return nil
}
return err
}
Copy the code
- SendOneWay mainly retries the p.client.invokeoneway
summary
DefaultProducer defines Group, Client, state, Options, publishInfo, callbackCh, and Interceptor. It provides the NewDefaultProducer, Start, Shutdown, SendSync, SendAsync, and SendOneWay methods
doc
- producer