sequence

This paper mainly studies the defaultProducer of RocketMQ-Client-Go

defaultProducer

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

typedefaultProducer struct { group string client internal.RMQClient state int32 options producerOptions publishInfo sync.Map  callbackCh chan interface{} interceptor primitive.Interceptor }Copy the code
  • DefaultProducer defines group, Client, state, Options, publishInfo, callbackCh, and Interceptor

NewDefaultProducer

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func NewDefaultProducer(opts ... Option) (*defaultProducer, error) { defaultOpts := defaultProducerOptions()for _, apply := range opts {
		apply(&defaultOpts)
	}
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
	iferr ! = nil {return nil, errors.Wrap(err, "new Namesrv failed.")}if! defaultOpts.Credentials.IsEmpty() {
		srvs.SetCredentials(defaultOpts.Credentials)
	}
	defaultOpts.Namesrv = srvs

	producer := &defaultProducer{
		group:      defaultOpts.GroupName,
		callbackCh: make(chan interface{}),
		options:    defaultOpts,
	}
	producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)

	producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)

	return producer, nil
}
Copy the code
  • The NewDefaultProducer method creates NameServerAddrs using internal.NewNamesrv, then instantiates defaultProducer, Then instantiated internal. GetOrNewRocketMQClient and primitive. ChainInterceptors

Start

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) Start() error {
	atomic.StoreInt32(&p.state, int32(internal.StateRunning))
	if len(p.options.NameServerAddrs) == 0 {
		p.options.Namesrv.UpdateNameServerAddress(p.options.NameServerDomain, p.options.InstanceName)
	}

	p.client.RegisterProducer(p.group, p)
	p.client.Start()
	return nil
}
Copy the code
  • The enforcement of the Start method p.c lient. RegisterProducer and p.c lient. Start ()

Shutdown

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) Shutdown() error {
	atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
	p.client.UnregisterProducer(p.group)
	p.client.Shutdown()
	return nil
}
Copy the code
  • Shutdown method performs p.c lient. UnregisterProducer and p.c lient. The Shutdown ()

SendSync

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) SendSync(ctx context.Context, msgs ... *primitive.Message) (*primitive.SendResult, error) {iferr := p.checkMsg(msgs...) ; err ! = nil {return nil, err
	}

	msg := p.encodeBatch(msgs...)

	resp := new(primitive.SendResult)
	ifp.interceptor ! = nil { primitive.WithMethod(ctx, primitive.SendSync) producerCtx := &primitive.ProducerCtx{ ProducerGroup: p.group, CommunicationMode: primitive.SendSync, BornHost: utils.LocalIP, Message: *msg, SendResult: resp, } ctx = primitive.WithProducerCtx(ctx, producerCtx) err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error { var err error realReq := req.(*primitive.Message) realReply := reply.(*primitive.SendResult) err = p.sendSync(ctx, realReq, realReply)return err
		})
		return resp, err
	}

	err := p.sendSync(ctx, msg, resp)
	return resp, err
}
Copy the code
  • The SendSync method first validates the message through P.heckmsg, then encodebatch, then executes p.interceptor for non-null p.interceptor, and finally executes p.sendsync (CTX, MSG, resp).

sendSync

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {

	retryTime := 1 + p.options.RetryTimes

	var (
		err error
	)

	ifp.options.Namespace ! ="" {
		msg.Topic = p.options.Namespace + "%" + msg.Topic
	}

	var producerCtx *primitive.ProducerCtx
	for retryCount := 0; retryCount < retryTime; retryCount++ {
		mq := p.selectMessageQueue(msg)
		if mq == nil {
			err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
			continue
		}

		addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
		if addr == "" {
			return fmt.Errorf("topic=%s route info not found", mq.Topic)
		}

		ifp.interceptor ! = nil { producerCtx = primitive.GetProducerCtx(ctx) producerCtx.BrokerAddr = addr producerCtx.MQ = *mq } res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)if_err ! = nil { err = _errcontinue
		}
		return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
	}
	return err
}
Copy the code
  • , will retry sendSync retryCount is by p. electMessageQueue select mq (MSG), and then through the p.o ptions. Namesrv. FindBrokerAddrByName find addr, InvokeSync(CTX, ADDR, p.buildsendrequest

SendAsync

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, *primitive.SendResult, error), msgs ... *primitive.Message) error {iferr := p.checkMsg(msgs...) ; err ! = nil {return err
	}

	msg := p.encodeBatch(msgs...)

	ifp.interceptor ! = nil { primitive.WithMethod(ctx, primitive.SendAsync)return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
			return p.sendAsync(ctx, msg, f)
		})
	}
	return p.sendAsync(ctx, msg, f)
}
Copy the code
  • The SendAsync method basically executes p.sendAsync(CTX, MSG, f)

sendAsync

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
	ifp.options.Namespace ! ="" {
		msg.Topic = p.options.Namespace + "%" + msg.Topic
	}
	mq := p.selectMessageQueue(msg)
	if mq == nil {
		return errors.Errorf("the topic=%s route info not found", msg.Topic)
	}

	addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
	if addr == "" {
		return errors.Errorf("topic=%s route info not found", mq.Topic)
	}

	ctx, _ = context.WithTimeout(ctx, 3*time.Second)
	return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
		resp := new(primitive.SendResult)
		iferr ! = nil { h(ctx, nil, err) }else {
			p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
			h(ctx, resp, nil)
		}
	})
}
Copy the code
  • SendAsync mainly executes p.client.invokeasync

SendOneWay

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ... *primitive.Message) error {iferr := p.checkMsg(msgs...) ; err ! = nil {return err
	}

	msg := p.encodeBatch(msgs...)

	ifp.interceptor ! = nil { primitive.WithMethod(ctx, primitive.SendOneway)return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error {
			return p.SendOneWay(ctx, msg)
		})
	}

	return p.sendOneWay(ctx, msg)
}
Copy the code
  • SendOneWay: p.sendoneway (CTX, MSG)

sendOneWay

Rocketmq – the client – go – v2.0.0 / producer/producer. Go

func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
	retryTime := 1 + p.options.RetryTimes

	ifp.options.Namespace ! ="" {
		msg.Topic = p.options.Namespace + "%" + msg.Topic
	}

	var err error
	for retryCount := 0; retryCount < retryTime; retryCount++ {
		mq := p.selectMessageQueue(msg)
		if mq == nil {
			err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
			continue
		}

		addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
		if addr == "" {
			return fmt.Errorf("topic=%s route info not found", mq.Topic)
		}

		_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
		if_err ! = nil { err = _errcontinue
		}
		return nil
	}
	return err
}
Copy the code
  • SendOneWay mainly retries the p.client.invokeoneway

summary

DefaultProducer defines Group, Client, state, Options, publishInfo, callbackCh, and Interceptor. It provides the NewDefaultProducer, Start, Shutdown, SendSync, SendAsync, and SendOneWay methods

doc

  • producer