In the previous chapter, SpringBoot integrated RabbitMQ and explained the role of message queues in detail, so we will learn how SpringBoot integrated Kafka to send messages.

Kafka profile

Kafka is a high-throughput distributed messaging middleware developed in Scala and the Java language. The high throughput makes it a natural advantage in the field of big data and is widely used for logging.

Kafka architecture analysis

Note 1: The red arrows indicate the flow of messages, the blue indicates partition backup, and the green indicates that the Kafka cluster is registered with ZooKeeper.

Note 2: Before Version 0.9 of Kafka, the location of consumer consumption messages was recorded in ZooKeeper, and after version 0.9, the location of consumer consumption messages was recorded in a Topic in Kafka.

Kafka noun Introduction:

  1. Producer: indicates message producers
  2. -Penny: Consumer
  3. Consumer Group (CG) : Consumer Group. A topic can have multiple CG, and each Partition will send messages to only one Consumer in the GG
  4. Broker: A Kafka server is a Broker that has multiple topics
  5. Topic: Message Topic, message classification, can be thought of as a queue
  6. Partition: Partitioning. To achieve scaling, a large topic may be distributed across multiple brokers, and a topic can be divided into multiple partitions. Each message in the partition is assigned an ordered ID (offset).
  7. Offset: Kafka storage files are named offset.kafka for easy lookup. The first Offset is 0000000000.kafka.
  8. Leader: A partition has a backup and a primary partition
  9. Follower: indicates the secondary partition

1. Producer zoning policies

  1. Specify partitions.
  2. If no partition is specified but there is a key value, mod the hash value of the key with the number of partitions of the current topic to obtain the partition.
  3. If neither a partition nor a key is specified, the first call randomly generates an integer (incremented on this integer each time subsequent calls are made) and modulates this random number with the number of partitions for the topic to get the partition.

2. Message reliability

The ACK confirmation mechanism is adopted to ensure the reliability of messages.

After sending a message, Kafka synchronizes the message to other partition replicas. After all replicas have received the message, Kafka sends an ACK. The disadvantage of this pattern is that when one of the replicas goes down, the message producer does not receive an ACK from Kafka.

Kafka uses ISR to solve this problem.

ISR: a collection of followers maintained by the Leader that is synchronized with the Leader.

When folower completes data synchronization in the ISR, the leader sends an ACK to the follower. If the follower does not synchronize data to the leader for a long time, the follower is kicked out of the ISR. The time threshold is replica.lag.time.max.ms. The default time is 10s. When the leader is faulty, a new leader is elected from the ISR.

Note: Kafka is an ISR before version 0.9. One additional requirement is the number of synchronous messages.

Ack Parameter Configuration

0: The producer does not wait for the ack of the broker.

1: The leader partition sends an ACK to the producer after receiving the message.

-1 (all) : after the synchronization is successful, the leader and follower in the ISR send an ACK to the producer.

3. Message consistency problem

If the leader has 10 messages, the data is synchronized to the two followers. Follower A synchronizes 8 messages and follower B synchronizes 9. When the leader breaks down, the messages between followers A and B are inconsistent, and the remaining two followers elect A new leader.

  • LEO (log end offset) : The last offset of each copy

  • High watermark (HW) : indicates the smallest offset in all replicas

To ensure data consistency, all followers will truncate their log files that are higher than HW and then synchronize data from the new leader.

4. Message repetition problem

A new feature was introduced in kafka0.11: idempotence. When idempotent is enabled, ack defaults to -1. Set enable.idompotence in the producer to true to enable idempotency.

A Producer that enables idempotent is assigned a PID during initialization, and messages sent to the same Partition carry Sequence numbers. The Broker caches <PID,Partition,SeqNumber>, and only one message that is submitted with the same primary key. However, the PID will change every time you restart. Therefore, you can only ensure that the messages in the same partition of a session are not repeated.

5. Consumer group partition allocation policy

Kafka has two allocation strategies, RoundRobin and Range

RoundRobin is a way to assign partitions to consumers in a polling manner, provided that the consumers in the consumer group need to subscribe to the same topic.

Range is kafka’s default allocation policy, which is based on the current topic. If there are three partitions and two consumers in A consumer group, then consumer A will consume partitions 1 and 2, and consumer B will consume partitions 3.

6. Maintain consumer offset

Consumer offset was stored in ZooKeeper by default prior to Release 0.9 of Kafka. Starting with release 0.9, offset was stored in a built-in Kafka topic called _consumer_offsets.

7. Producer affairs

In order to realize the transaction of the cross-partition session, a globally unique Tracscation ID needs to be introduced and the PID obtained by Producer should be bound to it. This allows the original PID to be retrieved from the ongoing Transaction ID when the Producer restarts.

To manage Transcation ids, Kafka introduces a new component, Transcation Coordinator. Producer obtains the task status corresponding to Transction ids by interacting with Transcation coordinators.

Spring Boot integrates Kafka

1. Introduce kafka dependencies

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
Copy the code

2. Configure kafka service information

spring:
  kafka:
    Kafka service address
    bootstrap-servers: 47.104155.182.: 9092
    producer:
      Serialization of producer message keys
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      Serialization of producer message value
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # Consumer Group
      group-id: test-consumer-group
      Consumer message value deserialization
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      Consumer message value deserialization
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Copy the code

3. Consumers

@Component
@Slf4j
@KafkaListener(topics = {"first-topic"},groupId = "test-consumer-group")
public class Consumer {

    @KafkaHandler
    public void receive(String message){
        
        log.info("I am a consumer, and the message I receive is:"+message); }}Copy the code

4. Producers

@RestController
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("send")
    public void send(a){

        String message = "Hello, THIS is Java Journey.";
        // The first parameter topic
        // The second parameter message
        kafkaTemplate.send("first-topic",message); }}Copy the code

The sample code for this article has been uploaded togithub, point astarSupport!

Spring Boot series tutorial directory

Spring-boot-route (I) Several ways for Controller to receive parameters

Spring-boot-route (2) Several methods of reading configuration files

Spring-boot-route (3) Upload multiple files

Spring-boot-route (4) Global exception processing

Spring-boot-route (5) Integrate Swagger to generate interface documents

Spring-boot-route (6) Integrate JApiDocs to generate interface documents

Spring-boot-route (7) Integrate jdbcTemplate operation database

Spring-boot-route (8) Integrating mybatis operation database

Spring-boot-route (9) Integrate JPA operation database

Spring-boot-route (10) Switching between multiple data sources

Spring-boot-route (11) Encrypting database configuration information

Spring-boot-route (12) Integrate REDis as cache

Spring-boot-route RabbitMQ

Spring-boot-route Kafka

Spring-boot-route (15) Integrate RocketMQ

Spring-boot-route (16) Use logback to produce log files

Spring-boot-route (17) Use AOP to log operations

Spring-boot-route (18) Spring-boot-adtuator monitoring applications

Spring-boot-route (19) Spring-boot-admin Monitoring service

Spring-boot-route (20) Spring Task Implements simple scheduled tasks

Spring-boot-route (21) Quartz Implements dynamic scheduled tasks

Spring-boot-route (22) Enables email sending

Spring-boot-route (23) Developed wechat official accounts

Spring-boot-route (24) Distributed session consistency processing

Spring-boot-route (25) two lines of code to achieve internationalization

Spring-boot-route (26) Integrate webSocket

This series of articles are frequently used in the work of knowledge, after learning this series, to cope with daily development more than enough. If you want to know more, just scan the qr code below and let me know. I will further improve this series of articles!