sequence
This article focuses on remoteBrokerOffsetStore for RocketMQ-Client-Go
remoteBrokerOffsetStore
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
type remoteBrokerOffsetStore struct {
group string
OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`
client internal.RMQClient
namesrv internal.Namesrvs
mutex sync.RWMutex
}
Copy the code
- RemoteBrokerOffsetStore defines the Group, OffsetTable, Client, NamesRV, mutex attributes
NewRemoteOffsetStore
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore {
return &remoteBrokerOffsetStore{
group: group,
client: client,
namesrv: namesrv,
OffsetTable: make(map[primitive.MessageQueue]int64),
}
}
Copy the code
- The NewRemoteOffsetStore method instantiates remoteBrokerOffsetStore
persist
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
r.mutex.Lock()
defer r.mutex.Unlock()
if len(mqs) == 0 {
return
}
used := make(map[primitive.MessageQueue]struct{}, 0)
for _, mq := range mqs {
used[*mq] = struct{}{}
}
for mq, off := range r.OffsetTable {
if_, ok := used[mq]; ! ok { delete(r.OffsetTable, mq)continue
}
err := r.updateConsumeOffsetToBroker(r.group, mq, off)
iferr ! = nil { rlog.Warning("update offset to broker error", map[string]interface{}{
rlog.LogKeyConsumerGroup: r.group,
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyUnderlayError: err.Error(),
"offset": off,
})
} else {
rlog.Info("update offset to broker success", map[string]interface{}{
rlog.LogKeyConsumerGroup: r.group,
rlog.LogKeyMessageQueue: mq.String(),
"offset": off,
})
}
}
}
Copy the code
- Persist methods traverse OffsetTable, perform r.u pdateConsumeOffsetToBroker
remove
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
r.mutex.Lock()
defer r.mutex.Unlock()
delete(r.OffsetTable, *mq)
rlog.Info("delete mq from offset table", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
})
}
Copy the code
- Remove method execution
delete(r.OffsetTable, *mq)
read
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
r.mutex.RLock()
switch t {
case _ReadFromMemory, _ReadMemoryThenStore:
off, exist := r.OffsetTable[*mq]
if exist {
r.mutex.RUnlock()
return off
}
if t == _ReadFromMemory {
r.mutex.RUnlock()
return -1
}
fallthrough
case _ReadFromStore:
off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
iferr ! = nil { rlog.Error("fecth offset of mq error", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyUnderlayError: err,
})
r.mutex.RUnlock()
return -1
}
r.mutex.RUnlock()
r.update(mq, off, true)
return off
default:
}
return1}Copy the code
- The read method for _ReadFromStore executes r.f etchConsumeOffsetFromBroker
update
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
r.mutex.Lock()
defer r.mutex.Unlock()
localOffset, exist := r.OffsetTable[*mq]
if! exist { r.OffsetTable[*mq] = offsetreturn
}
if increaseOnly {
if localOffset < offset {
r.OffsetTable[*mq] = offset
}
} else {
r.OffsetTable[*mq] = offset
}
}
Copy the code
- Update method updates r.offsettable [*mq]
fetchConsumeOffsetFromBroker
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
if broker == "" {
r.namesrv.UpdateTopicRouteInfo(mq.Topic)
broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
}
if broker == "" {
return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
}
queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
}
cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
iferr ! = nil {return -1, err
}
ifres.Code ! = internal.ResSuccess {return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
}
off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
iferr ! = nil {return -1, err
}
return off, nil
}
Copy the code
- FetchConsumeOffsetFromBroker method build QueryConsumerOffsetRequestHeader request, and then by r.c lient. InvokeSync initiated the request
updateConsumeOffsetToBroker
Rocketmq – the client – go – v2.0.0 / consumer/offset_store. Go
func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error {
broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
if broker == "" {
r.namesrv.UpdateTopicRouteInfo(mq.Topic)
broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
}
if broker == "" {
return fmt.Errorf("broker: %s address not found", mq.BrokerName)
}
updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
CommitOffset: off,
}
cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
}
Copy the code
- UpdateConsumeOffsetToBroker method build UpdateConsumerOffsetRequestHeader request, and then by r.c lient. InvokeOneWay initiate the request
summary
RemoteBrokerOffsetStore defines the Group, OffsetTable, Client, NamesRV, mutex attributes; It provides NewRemoteOffsetStore, persist, remove, read, update, fetchConsumeOffsetFromBroker, updateConsumeOffsetToBroker method
doc
- offset_store