Libraries to rely on

Github.com/Shopify/sarama github.com/bsm/sarama-cluster / / / / kafka main library * kafka consumer groupsCopy the code

producers

package producer

import (
	"fmt"
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
)

var (
	ProducerId = 1
)

type Producer struct {
	Producer   sarama.SyncProducer
	Topic      string / / theme
	ProducerID int    // Producer Id
	MessageId  int
}

func (p *Producer) InitProducer(a) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // After sending data, both the leader and follow need to confirm it
	config.Producer.Partitioner = sarama.NewRandomPartitioner // Select a new partition
	config.Producer.Return.Successes = true                   // Successfully delivered messages will be returned in success Channel

	/ / connected kafka
	client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config)
	iferr ! =nil {
		tlog.Error("producer closed, err:", err)
		return
	}

	p.Producer = client
	p.Topic = define.TOPIC
	p.ProducerID = ProducerId
	p.MessageId = 1

	ProducerId++
}

func (p *Producer) SendMessage(a) {
	// Construct a message
	msg := &sarama.ProducerMessage{}
	msg.Topic = p.Topic
	txt := fmt.Sprintf("ProducerID:%d this is a test log %d",
		p.ProducerID, p.MessageId)
	msg.Value = sarama.StringEncoder(txt)

	// Send a message
	pid, offset, err := p.Producer.SendMessage(msg)
	//_, _, err := client.SendMessage(msg)
	iferr ! =nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
		p.ProducerID, pid, offset, txt))

	p.MessageId++
}

func (p *Producer) Close(a) {
	p.Producer.Close()
}

Copy the code

consumers

package consumer

import (
	"github.com/HappyTeemo7569/teemoKit/tlog"
	"github.com/Shopify/sarama"
	"kafkaDemo/define"
)

type Consumer struct {
	Consumer   sarama.Consumer
	Topic      string
	ConsumerId int // Consumer Id
}

func (c *Consumer) InitConsumer(a) error {
	consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil)
	iferr ! =nil {
		return err
	}
	c.Consumer = consumer
	c.Topic = define.TOPIC
	c.ConsumerId = ConsumerId
	ConsumerId++
	return nil
}

/ / specified partition
// Offset can be specified. Pass -1 to get the latest offest
func (c *Consumer) GetMessage(partitionId int32, offset int64) {
	if offset == - 1 {
		offset = sarama.OffsetNewest
	}
	pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset)
	iferr ! =nil {
		tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err)
		//That topic/partition is already being consumed
		return
	}

	// Consume information asynchronously from each partition
	go func(sarama.PartitionConsumer) {
		for msg := range pc.Messages() {
			tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value))
		}
	}(pc)
}

// Iterate over all partitions
func (c *Consumer) GetMessageToAll(offset int64) {
	partitionList, err := c.Consumer.Partitions(c.Topic) // Select all partitions according to topic
	iferr ! =nil {
		tlog.Error("fail to get list of partition:err%v", err)
		return
	}
	tlog.Info("All of the partition.", partitionList)

	for partition := range partitionList { // Iterate over all partitions
		c.GetMessage(int32(partition), offset)
	}
}


Copy the code

The main function

func main(a) {
	tlog.Info("Start")

	go producer.Put()
	go consumer.Get()

	for {
		time.Sleep(time.Hour * 60)}}func Put(a) {
	producer := new(Producer)
	producer.InitProducer()
	go func(a) {
		for {
			producer.SendMessage()
			time.Sleep(1 * time.Second)
		}
	}()
}

func Get(a) {

	offest := int64(0)

	consumer := new(Consumer)
	err := consumer.InitConsumer()
	iferr ! =nil {
		tlog.Error("fail to init consumer, err:%v", err)
		return
	}
	consumer.GetMessageToAll(offest)
}


Copy the code

Specific source can view: https://github.com/HappyTeemo7569/kafkaDemo