A brief summary of springboot kafka integration process
Kafka springboot integration
First poM introduces Spring’s support for Kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> < version > 2.4.5. RELEASE < / version > < / dependency >Copy the code
The configuration file
The configuration file explanation is written directly into the code comments
Spring: kafka: ##### producer configuration: # specify kafka addresses, separate them with commas. Ip1: port1, ip2: port2, ip3: port3 # producers to wait for the server to ack number # acks: 0 producer sends a message won't wait for the server to ack # acks: #acks = all If a producer sends a message, it will wait for an ACK from the server. If a producer sends a message, it will wait for an ACK from the server. Batch-size: 15654 bytes Batch-size: 15654 bytes Batch-size: 15654 bytes batch-size: 15654 bytes batch-memory: 33554432 bytes Batch-memory: 33554432 bytes 123 # key serialization class key - serializer: org.apache.kafka.com mon. Serialization. StringSerializer # the value of value - serializer class: Org.apache.kafka.com mon. Serialization. # StringSerializer failure retry the number of retries: #uncompressed indicates that the producer is not compressed #uncompressed indicates that the producer is preserved. The default value is compression-type: User user ##### consumer: # Specify kafka addresses (multiple, separated by commas) Ip1: port1, ip2: port2, ip3: port3 # whether consumers regularly automatic offset the enable - auto - commit: true # consumers automatically submit the offset frequency unit is ms auto - commit - interval: Client-id: 123 # The minimum amount of data to be fetched by the consumer, in bytes cofetch -min-size: 1 # Fetch - Max -wait (milliseconds) to block when the value of fetch-min-size is satisfied: 3000 # The current offset no longer exists on the server. The default value is latest, indicating that the offset is automatically reset to the latest offset. Auto-offset-reset: latest # The unique identifier used to identify the consumer group-id: Consumer1 # Heartbeat time, in milliseconds heartbeat-interval: 3000 # Key deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer # value deserialization of the consumers use value - deserializer class: Org.apache.kafka.com mon. Serialization. # StringDeserializer poll operation returns the maximum number of records, the default value is 500 Max - poll - records: 5000Copy the code
Producer code
KafkaTemplate API is used to send messages. KafkaTemplate API is used to send messages
/** * @date 2020/4/27 */ @component public class Product {@autoWired private KafkaTemplate kafkaTemplate; Public void sendMsg(String topic,Object MSG){public void sendMsg(String topic,Object MSG){ }}Copy the code
Consumer code
KafkaTemplate API is used to consume messages. KafkaTemplate API is used to consume messages
/** * Consumer code * @date 2020/4/27 */ @component public class Consumer {@kafkalistener (topics = "topic") public void consumer(ConsumerRecord record){ Optional<Object> optional = Optional.ofNullable(record.value()); If (optional.isPresent()){// the o Object has been deserialized using the config deserialization class in the configuration file. Object o = option.get (); }}}Copy the code
- Here’s the @kafkalistener (Topics = “”) annotation. The Topics attribute is the topic to be consumed
- Again, the isPresent() and get() methods for Optional objects. IsPresent () returns true when value isPresent in Optional, and get() returns the value
Springboot kafka integration process for you to simply share here, welcome to exchange, point out some of the wrong places, let me deepen my understanding, I wish you no bug, thank you!