Hello everyone, my name is Xie Wei, I am a programmer.

Today’s topic: Getting started with Kafka – Cluster Edition

Prerequisite review: Kafka single node

1. Basic concepts

In the message system, the concepts involved are relatively similar. When you are new to the message system, sometimes you do not understand the concepts well, so you need to go back and clarify the basic concepts according to your own learning progress.

Here are the basic concepts in question and answer format:

  1. What is a broker?

Simply put, a Kafka server is a broker.

  1. What is a producer?

Simply put, the system that provides the message is called a producer

  1. What is a consumer?

Simply put, the system that processes the message is called the consumer

  1. What is topic?

To put it simply, topic is a logical concept that distinguishes between different types of messages and gives them artificial names.

  1. What is a partition?

In simple terms, it is the entity that stores the message, dividing the topic into different partitions. At the physical level, this is a folder named topic-n that stores message logs. Partitions can be on the same broker or on different brokers if you use the clustered version of Kafka.

topic-go-0
topic-go-1
topic-go-2
...
Copy the code
  1. What is offset?

In simple terms, it’s a displacement number. It’s used to mark consumers. For example, if I send you 100 messages, how can I know which number you have consumed? Offset is used to mark.

  1. What is a consumer group?

In simple terms, a group of consumers collectively consume one or more topics, and of course one consumer consumes messages within one or more partitions. Why consumers and consumer groups? Consumers consume messages and need to subscribe to a topic. Consumer groups collectively consume one or more topics, which can result in extensibility and fault tolerance. Expansion refers to that a new consumer can undertake part of the task and reduce the burden of other consumers; Similarly, reduce one consumer and reassign messages to the consumer. This allocation mechanism is called Rebalance in Kafka.

So when is Rebalance?

  • Changes in the number of consumers
  • The change of the topic
  • Zoning change

Changes in the number of consumers are the most common scenario. Rebalance has pros and cons: scalable and fault-tolerant. Rebalance. It stops consuming messages at some point.

  1. What is a Kafka cluster?

Simply put, a cluster is a collection of services, a typical feature is: multiple machines, multiple services. This feature can guarantee the high availability and high concurrency of the system. Internal systems can discover each other using ZooKeeper and Metadata. Externally, it’s like using a single service.

  1. How to control the size of the “ability”?

Configuration files, for example, how do I ensure that the producer sends the message accurately, for example, multiple partitions, what partitioning policy do I follow, for example, should the producer’s message be compressed, what compression method should be adopted? For example, whether the consumer is the latest consumption or the oldest news consumption; What is the consumer group’s Rebalance strategy?

These characteristics, I call them capabilities, capabilities that the user needs to be familiar with in order to perform, or to be able to analyze on a case-by-case basis.

  • Configuration of broker capabilities
  • Configuration of producer “capabilities”
  • Configuration of consumer “capabilities”
  • Configuration of consumer group capabilities

2. The configuration

The configuration file when the service is started, which is also the general way for the startup of most services, such as MySQL database service, Redis service, etc., are all configured at startup to give them the ability.

broker

# directory
config/server.properties
Copy the code
  • Log.dirs message storage directory, you can have more than one
log.dirs=/kafka/kafka-logs-kfk1
Copy the code
  • Zookeeper. connect, can be multiple, used in cluster mode
zookeeper.connect=zookeeper-1:2181
Copy the code
  • Advertised. External address
advertised.listeners=PLAINTEXT://kfk1:9092
Copy the code
  • Listener. Security. The protocol. The map security protocols
listener.security.protocol.map=CONTROLLER:PLAINTEXT
Copy the code

In general, these configurations are ok, and the others are default, of which log.dirs and zookeeper.connect are the most important

topic

  • Auto. Create. Switchable viewer. Enable whether to allow automatically create topic
auto.create.topics.enable=false
Copy the code

After starting the service, the client tool is used to write code to complete the corresponding Settings.

In go, the Kafka client uses sarama

type config struct {
    Producer struct{... } Consumerstruct{... Groupstruct{... }}}Copy the code
  • Configure config.consumer for consumers
  • Configure config.producer for producers
  • Configure Config.consumer.group for the Consumer Group

Consumer:

	c.Consumer.Fetch.Min = 1
	c.Consumer.Fetch.Default = 1024 * 1024
	c.Consumer.Retry.Backoff = 2 * time.Second
	c.Consumer.MaxWaitTime = 250 * time.Millisecond
	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
	c.Consumer.Return.Errors = false
	c.Consumer.Offsets.CommitInterval = 1 * time.Second
	c.Consumer.Offsets.Initial = OffsetNewest
	c.Consumer.Offsets.Retry.Max = 3
Copy the code

Where, generally default, otherwise configure:

  • C. Consumer.return. Errors
  • Consumption starting value: Arthur c. onsumer. Offsets. Initial
  • Retry mechanism: Retry

Producers:

/ / message to a maximum of about 1 MB c.P roducer. MaxMessageBytes = 1000000 / whether/message reply: 0: don't reply, disabled; 1: the leader receives the message. RequiredAcks = WaitForLocal c.producer.timeout = 10 * time.Second // Partition policy: random, polling,hashC. producer.partitioner = NewHashPartitioner // Retry mechanism C. producer.retry.max = 3 C. producer.retry.backoff = 100 * time.Millisecond c.Producer.Return.Errors =true/ / compression algorithm: gzip, ZSTD lz4, snappy pressionLevel = CompressionLevelDefault at c.Producer.ComCopy the code

Consumer Group:

. / / Interval Arthur c. onsumer. Group. The Session Timeout = 10 * time. The Second / / Heartbeat Arthur c. onsumer. Group. The Heartbeat. The Interval = 3 * time. The Second / / Arthur c. Rebalance Strategy onsumer. Group. Rebalance. Strategy = BalanceStrategyRange Arthur c. onsumer. Group. Rebalance. Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.SecondCopy the code

3. Consumer group

Ordinary consumers usually need to specify topic, offset to specify consumption:

Such as:

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	brokers := []string{"127.0.0.1:9092"}
	master, err := sarama.NewConsumer(brokers, config)
    consumer, err := master.ConsumePartition("topic-python".0, sarama.OffsetNewest)

Copy the code

Among them:

ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
Copy the code
  • topic
  • partition
  • offset

However, in this form, offset needs to be specified, which is not convenient to use. Therefore, the form of consumer group is generally used.

type KafkaConsumerGroupAction struct {
	group sarama.ConsumerGroup
}

func NewKafkaConsumerGroupAction(brokers []string, groupId string) *KafkaConsumerGroupAction {
	config := sarama.NewConfig()
	sarama.Logger = log.New(os.Stdout, "[consumer_group]", log.Lshortfile)
	// Rebalance strategy
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
	config.Consumer.Group.Session.Timeout = 20 * time.Second
	config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
	config.Consumer.IsolationLevel = sarama.ReadCommitted
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Version = sarama.V2_3_0_0
	consumerGroup, e := sarama.NewConsumerGroup(brokers, groupId, config)
	ife ! =nil {
		log.Println(e)
		return nil
	}
	return &KafkaConsumerGroupAction{group: consumerGroup}

}

func (K *KafkaConsumerGroupAction) Consume(topics []string, wg sync.WaitGroup, ctx context.Context) {
	var consumer = KafkaConsumerGroupHandler{ready: make(chan bool)}
	go func(a) {
		defer wg.Done()
		for {
			iferr := K.group.Consume(ctx, topics, &consumer); err ! =nil {
				log.Panicf("Error from consumer: %v", err)
			}
			ifctx.Err() ! =nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()
	<-consumer.ready
	log.Println("Sarama consumer up and running! ...")
	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	wg.Wait()
	iferr := K.group.Close(); err ! =nil {
		log.Panicf("Error closing client: %v", err)
	}
}

type KafkaConsumerGroupHandler struct {
	ready chan bool
}

func (K *KafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (K *KafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}
func (K *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partions = %d, offset = %d".string(message.Value), message.Timestamp, message.Topic, message.Partition, message.Offset)
		lag := claim.HighWaterMarkOffset() - message.Offset
		fmt.Println(lag)
		session.MarkMessage(message, "")}return nil
}
Copy the code

Consumer Group:

type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
	Errors() <-chan error

	Close() error
}
Copy the code

Among them:

type ConsumerGroupHandler interface {

	Setup(ConsumerGroupSession) error

	Cleanup(ConsumerGroupSession) error

	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
Copy the code

For real message processing, you need to implement the ConsumerGroupHandler interface.

4. General processing process of the producer

If these concepts are clear to you, what’s the challenge of using Kafka in general?

  • How do I ensure that messages are delivered correctly
  • How do I ensure that I do not re-consume messages
  • How do you ensure that messages don’t lag, preferably when producers send messages to the messaging system and consumers consume them immediately, without delay
  • How to ensure high availability of the system
  1. Producer allocation
  2. Instantiate producer
  3. Build the message
  4. Send a message
  5. Closing producer instances
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
	client, err := NewClient(addrs, conf)
	iferr ! =nil {
		return nil, err
	}
	return newAsyncProducer(client)
}
Copy the code
// Asynchronous producer
type AsyncProducer interface {

	AsyncClose()
	Close() error
	Input() chan<- *ProducerMessage // Send a message
	Successes() <-chan *ProducerMessage
	Errors() <-chan *ProducerError
}
Copy the code

5. General processing process of consumers

General processing process of consumers:

  1. Consumer configuration
  2. Instantiate consumer
  3. Subscribe to the topic
  4. Submit the displacement
  5. Close consumer
func NewConsumer(addrs []string, config *Config) (Consumer, error) {
	client, err := NewClient(addrs, config)
	iferr ! =nil {
		return nil, err
	}
	return newConsumer(client)
}
Copy the code
type Consumer interface {

	Topics() ([]string, error) / / message
	Partitions(topic string) ([]int32, error) / / partition
	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) // Consume messages
	HighWaterMarks() map[string]map[int32]int64 / / high water level

	Close() error
}
Copy the code

6. General processing process of consumer group

Ordinary consumers, need to specify partition and displacement, consumption, not commonly used. Generally select the consumer group.

So what is the general process for the consumer group?

  1. Configuring consumer Groups
  2. Example call consumer group, specify topic, specify consumer group GroupID
  3. News consumption
  4. Close consumer Group
type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
	Errors() <-chan error

	Close() error
}
Copy the code

Consumer group processor:

type ConsumerGroupHandler interface {

	Setup(ConsumerGroupSession) error
	Cleanup(ConsumerGroupSession) error
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
Copy the code

7. The cluster

As mentioned above, one of the characteristics of clustering is: multiple machines, multiple services.

In a real online environment, ZooKeeper is deployed on different machines and Kafka Server is deployed on different machines, which constitute a system and serve the online system together.

Individual learning, in order to achieve cluster effect, that is: use different ports can be distinguished.

Of course you can configure ZooKeeper and Kafka locally. But I generally prefer the container approach, which is easy to deploy.

  • Multi-node zookeeper
  zookeeper-1:
    image: zookeeper
    restart: always
    hostname: zookeeper-1
    ports:
      - 2181: 2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: Server. 1 = 0.0.0.0:2888-3888; 2181 server.2=zookeeper-2:2888:3888; 2181 server.3=zookeeper-3:2888:3888; 2181
    volumes:
      - /local/volumn/zookeeper1/data:/data
      - /local/volumn/zookeeper1/datalog:/datalog
  zookeeper-2:
    image: zookeeper
    restart: always
    hostname: zookeeper-2
    ports:
      - 2182: 2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888; 2181 Server. 2 = 0.0.0.0:2888-3888; 2181 server.3=zookeeper-3:2888:3888; 2181
    volumes:
      - /local/volumn/zookeeper2/data:/data
      - /local/volumn/zookeeper2/datalog:/datalog
  zookeeper-3:
    image: zookeeper
    restart: always
    hostname: zookeeper-3
    ports:
      - 2183: 2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888; 2181 server.2=zookeeper-2:2888:3888; 2181 Server. 3 = 0.0.0.0:2888-3888; 2181
    volumes:
      - /local/volumn/zookeeper3/data:/data
      - /local/volumn/zookeeper3/datalog:/datalog
Copy the code

The most important of these are environment variables:

ZOO_MY_ID usually indicates the myID with a numberCopy the code
ZOO_SERVERS
Copy the code

Server. A=B:C:D

  • A indicates myID, which indicates the server id
  • B indicates the IP address of the server
  • C indicates the port through which the server exchanges information with the leader server in the cluster
  • D indicates the port through which the servers communicate during the election

Some people say, I don’t know what to do with these environment variables, I don’t know what the environment variables are.

Check out the documentation on Docker Hub:

Zookeeper Docker Hub documentation: hub.docker.com/_/zookeeper

  • Multi-node kafka: (kafka docker hub address: hub.docker.com/r/wurstmeis…).
  kfk1:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk1
    hostname: kfk1
    restart: always
    ports:
      - 9092: 9092
      - 19999: 9999
    expose:
      - 19092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk1:9092
      KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:9092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk1:/kafka/kafka-logs-kfk1
  kfk2:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk2
    hostname: kfk2
    restart: always
    ports:
      - 29092: 29092
      - 29999: 9999
    expose:
      - 29092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk2:29092
      KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:29092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk2:/kafka/kafka-logs-kfk2
  kfk3:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk3
    hostname: kfk3
    restart: always
    ports:
      - 39092: 39092
      - 39999: 9999
    expose:
      - 39092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk3:39092
      KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:39092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk3:/kafka/kafka-logs-kfk3
Copy the code

The most important of these are the following environment variables:

KAFKA_BROKER_ID Broker.id For a single node, the default value is -1Copy the code
KAFKA_ZOOKEEPER_CONNECT Kafka ZooKeeper connection address, which corresponds to the external address of ZooKeeperCopy the code
KAFKA_ADVERTISED_LISTENERS External address and port number of the nodeCopy the code
KAFKA_LISTENERS: PLAINTEXT: / / 0.0.0.0:39092 foreign access to the node address and portCopy the code
  • Monitoring nodes (kafka – manager docker hub address: hub.docker.com/r/sheepkill…).
  ui:
    image: index.docker.io/sheepkiller/kafka-manager:latest
    restart: always
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - kfk1
      - kfk2
      - kfk3
    ports:
      - 9000: 9000
    environment:
      ZK_HOSTS: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_BROKERS: kfk1:19092,kfk2:29092,kfk3:39092
Copy the code

Environment variables:

ZK_HOSTS Address of the ZooKeeper nodeCopy the code
KAFKA_BROKERS Kafa node addressCopy the code

Activation:

docker-compose -f docker-compose.yml up -d
Copy the code

The cluster version of the kafka service is basically the same as the single-node Kafka service. The cluster version of the system is more robust, high availability, such as redundancy backup, once a node failure does not affect the service, unless all nodes fail.

  • Backup:

Create topic, the number of backups is less than or equal to the number of Kafka nodes. So three nodes, two backups, maybe any two of the three nodes.

  • partition

Single node, topic partition, all under the same folder; In the cluster version, partitions can be roughly evenly distributed across the cluster nodes

External services are exactly the same as single nodes.

Topic-go 10 partitions, 2 backups: 3 nodes respectively store: 6,7,7 partitions

Possible problems with the cluster version?

  • If you have set not to automatically create a topic, remember to manually create a topic first

  • The cluster access address is abnormal. 1 Set /etc/hosts. 2 Open ports, especially for cloud servers, remember open ports

  • Consumption Lag Lag, how to do? Adding consumer instances

Reference:

  • Github.com/ErikJiang/k…

Code Address:

  • Github.com/wuxiaoxiaos…