Hello everyone, my name is Xie Wei, I am a programmer.
Today’s topic: Getting started with Kafka – Cluster Edition
Prerequisite review: Kafka single node
1. Basic concepts
In the message system, the concepts involved are relatively similar. When you are new to the message system, sometimes you do not understand the concepts well, so you need to go back and clarify the basic concepts according to your own learning progress.
Here are the basic concepts in question and answer format:
- What is a broker?
Simply put, a Kafka server is a broker.
- What is a producer?
Simply put, the system that provides the message is called a producer
- What is a consumer?
Simply put, the system that processes the message is called the consumer
- What is topic?
To put it simply, topic is a logical concept that distinguishes between different types of messages and gives them artificial names.
- What is a partition?
In simple terms, it is the entity that stores the message, dividing the topic into different partitions. At the physical level, this is a folder named topic-n that stores message logs. Partitions can be on the same broker or on different brokers if you use the clustered version of Kafka.
topic-go-0
topic-go-1
topic-go-2
...
Copy the code
- What is offset?
In simple terms, it’s a displacement number. It’s used to mark consumers. For example, if I send you 100 messages, how can I know which number you have consumed? Offset is used to mark.
- What is a consumer group?
In simple terms, a group of consumers collectively consume one or more topics, and of course one consumer consumes messages within one or more partitions. Why consumers and consumer groups? Consumers consume messages and need to subscribe to a topic. Consumer groups collectively consume one or more topics, which can result in extensibility and fault tolerance. Expansion refers to that a new consumer can undertake part of the task and reduce the burden of other consumers; Similarly, reduce one consumer and reassign messages to the consumer. This allocation mechanism is called Rebalance in Kafka.
So when is Rebalance?
- Changes in the number of consumers
- The change of the topic
- Zoning change
Changes in the number of consumers are the most common scenario. Rebalance has pros and cons: scalable and fault-tolerant. Rebalance. It stops consuming messages at some point.
- What is a Kafka cluster?
Simply put, a cluster is a collection of services, a typical feature is: multiple machines, multiple services. This feature can guarantee the high availability and high concurrency of the system. Internal systems can discover each other using ZooKeeper and Metadata. Externally, it’s like using a single service.
- How to control the size of the “ability”?
Configuration files, for example, how do I ensure that the producer sends the message accurately, for example, multiple partitions, what partitioning policy do I follow, for example, should the producer’s message be compressed, what compression method should be adopted? For example, whether the consumer is the latest consumption or the oldest news consumption; What is the consumer group’s Rebalance strategy?
These characteristics, I call them capabilities, capabilities that the user needs to be familiar with in order to perform, or to be able to analyze on a case-by-case basis.
- Configuration of broker capabilities
- Configuration of producer “capabilities”
- Configuration of consumer “capabilities”
- Configuration of consumer group capabilities
2. The configuration
The configuration file when the service is started, which is also the general way for the startup of most services, such as MySQL database service, Redis service, etc., are all configured at startup to give them the ability.
broker
# directory
config/server.properties
Copy the code
- Log.dirs message storage directory, you can have more than one
log.dirs=/kafka/kafka-logs-kfk1
Copy the code
- Zookeeper. connect, can be multiple, used in cluster mode
zookeeper.connect=zookeeper-1:2181
Copy the code
- Advertised. External address
advertised.listeners=PLAINTEXT://kfk1:9092
Copy the code
- Listener. Security. The protocol. The map security protocols
listener.security.protocol.map=CONTROLLER:PLAINTEXT
Copy the code
In general, these configurations are ok, and the others are default, of which log.dirs and zookeeper.connect are the most important
topic
- Auto. Create. Switchable viewer. Enable whether to allow automatically create topic
auto.create.topics.enable=false
Copy the code
After starting the service, the client tool is used to write code to complete the corresponding Settings.
In go, the Kafka client uses sarama
type config struct {
Producer struct{... } Consumerstruct{... Groupstruct{... }}}Copy the code
- Configure config.consumer for consumers
- Configure config.producer for producers
- Configure Config.consumer.group for the Consumer Group
Consumer:
c.Consumer.Fetch.Min = 1
c.Consumer.Fetch.Default = 1024 * 1024
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3
Copy the code
Where, generally default, otherwise configure:
- C. Consumer.return. Errors
- Consumption starting value: Arthur c. onsumer. Offsets. Initial
- Retry mechanism: Retry
Producers:
/ / message to a maximum of about 1 MB c.P roducer. MaxMessageBytes = 1000000 / whether/message reply: 0: don't reply, disabled; 1: the leader receives the message. RequiredAcks = WaitForLocal c.producer.timeout = 10 * time.Second // Partition policy: random, polling,hashC. producer.partitioner = NewHashPartitioner // Retry mechanism C. producer.retry.max = 3 C. producer.retry.backoff = 100 * time.Millisecond c.Producer.Return.Errors =true/ / compression algorithm: gzip, ZSTD lz4, snappy pressionLevel = CompressionLevelDefault at c.Producer.ComCopy the code
Consumer Group:
. / / Interval Arthur c. onsumer. Group. The Session Timeout = 10 * time. The Second / / Heartbeat Arthur c. onsumer. Group. The Heartbeat. The Interval = 3 * time. The Second / / Arthur c. Rebalance Strategy onsumer. Group. Rebalance. Strategy = BalanceStrategyRange Arthur c. onsumer. Group. Rebalance. Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.SecondCopy the code
3. Consumer group
Ordinary consumers usually need to specify topic, offset to specify consumption:
Such as:
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
brokers := []string{"127.0.0.1:9092"}
master, err := sarama.NewConsumer(brokers, config)
consumer, err := master.ConsumePartition("topic-python".0, sarama.OffsetNewest)
Copy the code
Among them:
ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
Copy the code
- topic
- partition
- offset
However, in this form, offset needs to be specified, which is not convenient to use. Therefore, the form of consumer group is generally used.
type KafkaConsumerGroupAction struct {
group sarama.ConsumerGroup
}
func NewKafkaConsumerGroupAction(brokers []string, groupId string) *KafkaConsumerGroupAction {
config := sarama.NewConfig()
sarama.Logger = log.New(os.Stdout, "[consumer_group]", log.Lshortfile)
// Rebalance strategy
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
config.Consumer.Group.Session.Timeout = 20 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
config.Consumer.IsolationLevel = sarama.ReadCommitted
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V2_3_0_0
consumerGroup, e := sarama.NewConsumerGroup(brokers, groupId, config)
ife ! =nil {
log.Println(e)
return nil
}
return &KafkaConsumerGroupAction{group: consumerGroup}
}
func (K *KafkaConsumerGroupAction) Consume(topics []string, wg sync.WaitGroup, ctx context.Context) {
var consumer = KafkaConsumerGroupHandler{ready: make(chan bool)}
go func(a) {
defer wg.Done()
for {
iferr := K.group.Consume(ctx, topics, &consumer); err ! =nil {
log.Panicf("Error from consumer: %v", err)
}
ifctx.Err() ! =nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready
log.Println("Sarama consumer up and running! ...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
wg.Wait()
iferr := K.group.Close(); err ! =nil {
log.Panicf("Error closing client: %v", err)
}
}
type KafkaConsumerGroupHandler struct {
ready chan bool
}
func (K *KafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (K *KafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (K *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partions = %d, offset = %d".string(message.Value), message.Timestamp, message.Topic, message.Partition, message.Offset)
lag := claim.HighWaterMarkOffset() - message.Offset
fmt.Println(lag)
session.MarkMessage(message, "")}return nil
}
Copy the code
Consumer Group:
type ConsumerGroup interface {
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
Errors() <-chan error
Close() error
}
Copy the code
Among them:
type ConsumerGroupHandler interface {
Setup(ConsumerGroupSession) error
Cleanup(ConsumerGroupSession) error
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
Copy the code
For real message processing, you need to implement the ConsumerGroupHandler interface.
4. General processing process of the producer
If these concepts are clear to you, what’s the challenge of using Kafka in general?
- How do I ensure that messages are delivered correctly
- How do I ensure that I do not re-consume messages
- How do you ensure that messages don’t lag, preferably when producers send messages to the messaging system and consumers consume them immediately, without delay
- How to ensure high availability of the system
- Producer allocation
- Instantiate producer
- Build the message
- Send a message
- Closing producer instances
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
client, err := NewClient(addrs, conf)
iferr ! =nil {
return nil, err
}
return newAsyncProducer(client)
}
Copy the code
// Asynchronous producer
type AsyncProducer interface {
AsyncClose()
Close() error
Input() chan<- *ProducerMessage // Send a message
Successes() <-chan *ProducerMessage
Errors() <-chan *ProducerError
}
Copy the code
5. General processing process of consumers
General processing process of consumers:
- Consumer configuration
- Instantiate consumer
- Subscribe to the topic
- Submit the displacement
- Close consumer
func NewConsumer(addrs []string, config *Config) (Consumer, error) {
client, err := NewClient(addrs, config)
iferr ! =nil {
return nil, err
}
return newConsumer(client)
}
Copy the code
type Consumer interface {
Topics() ([]string, error) / / message
Partitions(topic string) ([]int32, error) / / partition
ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) // Consume messages
HighWaterMarks() map[string]map[int32]int64 / / high water level
Close() error
}
Copy the code
6. General processing process of consumer group
Ordinary consumers, need to specify partition and displacement, consumption, not commonly used. Generally select the consumer group.
So what is the general process for the consumer group?
- Configuring consumer Groups
- Example call consumer group, specify topic, specify consumer group GroupID
- News consumption
- Close consumer Group
type ConsumerGroup interface {
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
Errors() <-chan error
Close() error
}
Copy the code
Consumer group processor:
type ConsumerGroupHandler interface {
Setup(ConsumerGroupSession) error
Cleanup(ConsumerGroupSession) error
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
Copy the code
7. The cluster
As mentioned above, one of the characteristics of clustering is: multiple machines, multiple services.
In a real online environment, ZooKeeper is deployed on different machines and Kafka Server is deployed on different machines, which constitute a system and serve the online system together.
Individual learning, in order to achieve cluster effect, that is: use different ports can be distinguished.
Of course you can configure ZooKeeper and Kafka locally. But I generally prefer the container approach, which is easy to deploy.
- Multi-node zookeeper
zookeeper-1:
image: zookeeper
restart: always
hostname: zookeeper-1
ports:
- 2181: 2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: Server. 1 = 0.0.0.0:2888-3888; 2181 server.2=zookeeper-2:2888:3888; 2181 server.3=zookeeper-3:2888:3888; 2181
volumes:
- /local/volumn/zookeeper1/data:/data
- /local/volumn/zookeeper1/datalog:/datalog
zookeeper-2:
image: zookeeper
restart: always
hostname: zookeeper-2
ports:
- 2182: 2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zookeeper-1:2888:3888; 2181 Server. 2 = 0.0.0.0:2888-3888; 2181 server.3=zookeeper-3:2888:3888; 2181
volumes:
- /local/volumn/zookeeper2/data:/data
- /local/volumn/zookeeper2/datalog:/datalog
zookeeper-3:
image: zookeeper
restart: always
hostname: zookeeper-3
ports:
- 2183: 2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zookeeper-1:2888:3888; 2181 server.2=zookeeper-2:2888:3888; 2181 Server. 3 = 0.0.0.0:2888-3888; 2181
volumes:
- /local/volumn/zookeeper3/data:/data
- /local/volumn/zookeeper3/datalog:/datalog
Copy the code
The most important of these are environment variables:
ZOO_MY_ID usually indicates the myID with a numberCopy the code
ZOO_SERVERS
Copy the code
Server. A=B:C:D
- A indicates myID, which indicates the server id
- B indicates the IP address of the server
- C indicates the port through which the server exchanges information with the leader server in the cluster
- D indicates the port through which the servers communicate during the election
Some people say, I don’t know what to do with these environment variables, I don’t know what the environment variables are.
Check out the documentation on Docker Hub:
Zookeeper Docker Hub documentation: hub.docker.com/_/zookeeper
- Multi-node kafka: (kafka docker hub address: hub.docker.com/r/wurstmeis…).
kfk1:
image: index.docker.io/wurstmeister/kafka:latest
container_name: kfk1
hostname: kfk1
restart: always
ports:
- 9092: 9092
- 19999: 9999
expose:
- 19092
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk1:9092
KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:9092
JMX_PORT: 9999
volumes:
- /local/volumn/kfk1:/kafka/kafka-logs-kfk1
kfk2:
image: index.docker.io/wurstmeister/kafka:latest
container_name: kfk2
hostname: kfk2
restart: always
ports:
- 29092: 29092
- 29999: 9999
expose:
- 29092
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk2:29092
KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:29092
JMX_PORT: 9999
volumes:
- /local/volumn/kfk2:/kafka/kafka-logs-kfk2
kfk3:
image: index.docker.io/wurstmeister/kafka:latest
container_name: kfk3
hostname: kfk3
restart: always
ports:
- 39092: 39092
- 39999: 9999
expose:
- 39092
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk3:39092
KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:39092
JMX_PORT: 9999
volumes:
- /local/volumn/kfk3:/kafka/kafka-logs-kfk3
Copy the code
The most important of these are the following environment variables:
KAFKA_BROKER_ID Broker.id For a single node, the default value is -1Copy the code
KAFKA_ZOOKEEPER_CONNECT Kafka ZooKeeper connection address, which corresponds to the external address of ZooKeeperCopy the code
KAFKA_ADVERTISED_LISTENERS External address and port number of the nodeCopy the code
KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:39092 foreign access to the node address and portCopy the code
- Monitoring nodes (kafka – manager docker hub address: hub.docker.com/r/sheepkill…).
ui:
image: index.docker.io/sheepkiller/kafka-manager:latest
restart: always
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
- kfk1
- kfk2
- kfk3
ports:
- 9000: 9000
environment:
ZK_HOSTS: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_BROKERS: kfk1:19092,kfk2:29092,kfk3:39092
Copy the code
Environment variables:
ZK_HOSTS Address of the ZooKeeper nodeCopy the code
KAFKA_BROKERS Kafa node addressCopy the code
Activation:
docker-compose -f docker-compose.yml up -d
Copy the code
The cluster version of the kafka service is basically the same as the single-node Kafka service. The cluster version of the system is more robust, high availability, such as redundancy backup, once a node failure does not affect the service, unless all nodes fail.
- Backup:
Create topic, the number of backups is less than or equal to the number of Kafka nodes. So three nodes, two backups, maybe any two of the three nodes.
- partition
Single node, topic partition, all under the same folder; In the cluster version, partitions can be roughly evenly distributed across the cluster nodes
External services are exactly the same as single nodes.
Topic-go 10 partitions, 2 backups: 3 nodes respectively store: 6,7,7 partitions
Possible problems with the cluster version?
-
If you have set not to automatically create a topic, remember to manually create a topic first
-
The cluster access address is abnormal. 1 Set /etc/hosts. 2 Open ports, especially for cloud servers, remember open ports
-
Consumption Lag Lag, how to do? Adding consumer instances
Reference:
- Github.com/ErikJiang/k…
Code Address:
- Github.com/wuxiaoxiaos…