sequence

This article focuses on QueueSelector for RocketMQ-Client-Go

QueueSelector

Rocketmq – the client – go – v2.0.0 / producer/selector. Go

type QueueSelector interface {
	Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue
}
Copy the code
  • QueueSelector interface, which defines the Select method

manualQueueSelector

Rocketmq – the client – go – v2.0.0 / producer/selector. Go

type manualQueueSelector struct{}

func NewManualQueueSelector() QueueSelector {
	return new(manualQueueSelector)
}

func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
	return message.Queue
}
Copy the code
  • The manualQueueSelector select method returns message.queue directly

NewRandomQueueSelector

Rocketmq – the client – go – v2.0.0 / producer/selector. Go

type randomQueueSelector struct {
	rander *rand.Rand
}

func NewRandomQueueSelector() QueueSelector {
	s := new(randomQueueSelector)
	s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
	return s
}

func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
	i := r.rander.Intn(len(queues))
	return queues[i]
}
Copy the code
  • The NewRandomQueueSelector method creates randomQueueSelector and then sets its rander; The Select method uses r.rand.intn (len(queues)) to randomly Select the index and queue

roundRobinQueueSelector

Rocketmq – the client – go – v2.0.0 / producer/selector. Go

typeroundRobinQueueSelector struct { sync.Locker indexer map[string]*int32 } func NewRoundRobinQueueSelector() QueueSelector  { s := &roundRobinQueueSelector{ Locker: new(sync.Mutex), indexer: map[string]*int32{}, }return s
}

func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
	t := message.Topic
	if_, exist := r.indexer[t]; ! exist { r.Lock()if_, exist := r.indexer[t]; ! exist { var v = int32(0) r.indexer[t] = &v } r.Unlock() } index := r.indexer[t] i := atomic.AddInt32(index, 1)if i < 0 {
		i = -i
		atomic.StoreInt32(index, 0)
	}
	qIndex := int(i) % len(queues)
	return queues[qIndex]
}
Copy the code
  • RoundRobinQueueSelector qIndex forint(i) % len(queues)

hashQueueSelector

Rocketmq – the client – go – v2.0.0 / producer/selector. Go

type hashQueueSelector struct {
	random QueueSelector
}

func NewHashQueueSelector() QueueSelector {
	return &hashQueueSelector{
		random: NewRandomQueueSelector(),
	}
}

// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.
func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
	key := message.GetShardingKey()
	if len(key) == 0 {
		return h.random.Select(message, queues)
	}

	hasher := fnv.New32a()
	_, err := hasher.Write([]byte(key))
	iferr ! = nil {return nil
	}
	queueId := int(hasher.Sum32()) % len(queues)
	if queueId < 0 {
		queueId = -queueId
	}
	return queues[queueId]
}
Copy the code
  • HashQueueSelector throughint(hasher.Sum32()) % len(queues)To compute the index of the queue

summary

Rocketmq-client-go selector. Go defines manualQueueSelector, roundRobinQueueSelector, and hashQueueSelector

doc

  • selector.go