Kafka SpringBoot integration
1. Kafka has been installed
2. Create a SpringBoot project
Adding project dependencies
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Copy the code
Add the configuration file application.properties
########### [Kafka cluster] ###########
spring.kafka.bootstrap-servers=192.168.2.243:9092192168 2.244:9092192168 2.245:9092
########### [Initial producer configuration] ###########
# retries
spring.kafka.producer.retries=0
Response level: how many copies of a partition are backed up to send an ACK acknowledgement to the producer (optional 0, 1, all/-1)
spring.kafka.producer.acks=1
#16384=16KB
#5120=5KB
spring.kafka.producer.batch-size=5120
# commit delay
spring.kafka.producer.properties.linger.ms=0
When the production end accumulates batch-size messages or receives linger. Ms messages, the producer submits the messages to Kafka
A # linger.ms value of 0 means that every message received is submitted to Kafka. In this case, batch-size is useless
# Production buffer size
#33554432B=32M
#5242880=5M
spring.kafka.producer.buffer-memory = 5242880
Serialization and deserialization classes provided by Kafka
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# Custom partition
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
########### [Initializing consumer configuration] ###########
The default consumer group ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
Whether to automatically submit offset
spring.kafka.consumer.enable-auto-commit=true
Offset submission delay (how long after the message is received to submit the offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
Kafka automatically resets offset when there is no initial offset in kafka or offset is out of range
Earliest: resets to the smallest offset in the partition;
Latest: resets to the latest offset in the partition (consumes newly generated data in the partition);
# none: Throw an exception whenever a partition has no committed offset;
spring.kafka.consumer.auto-offset-reset=latest
This is the time when the consumer doesn't send a heartbeat and rebalance.
spring.kafka.consumer.properties.session.timeout.ms=120000
Consume request timeout
spring.kafka.consumer.properties.request.timeout.ms=180000
Serialization and deserialization classes provided by Kafka
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
[Bug Mc-10866] - Project startup error (closed) when topic not being listened to on consumer side
spring.kafka.listener.missing-topics-fatal=false
# Set bulk consumption
# spring.kafka.listener.type=batch
The maximum number of messages consumed at a time
# spring.kafka.consumer.max-poll-records=50
Copy the code
(Optional) Adding a Configuration file
package com.example.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaInitialConfiguration {
Create a Topic named testTopic and set the number of partitions to 8 and the number of partitions copies to 2
@Bean
public NewTopic initialTopic(a) {
return new NewTopic("topic-test-llc".3, (short) 2 );
}
// If you want to change the number of partitions, just change the configuration value and restart the project
// Changing the number of partitions does not result in data loss, but the number of partitions can only be increased, not decreased
@Bean
public NewTopic updateTopic(a) {
return new NewTopic("testtopic".10, (short) 2); }}Copy the code
Kafka’s producer does not bring back callback functions
package com.example.kafka.controller;
import com.alibaba.fastjson.JSONObject;
import com.example.kafka.vo.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// Send a message
@GetMapping("/kafka/normal/{message}")
public String sendMessage1(@PathVariable("message") String normalMessage, session) {
kafkaTemplate.send("topic-test-llc", sendResult);
return "ok"; }}Copy the code
Kafka consumers
package com.example.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
//@Component
public class KafkaConsumer {
// Consume listener
@KafkaListener(topics = {"topic-test-llc"})
public void onMessage1(ConsumerRecord
record){
// Print out the message contents for which topic and partition are consumed
System.out.println("Simple consumption Topic:"+record.topic()+"* * partition"+record.partition()+"** value content"+record.value()); }}Copy the code
Kafka consumers are a little slow to start up and may have to wait a little while before receiving a message from Kafka
Access:
Access path:
localhost:8080/kafka/normal/aaa
Copy the code
The above example creates a producer that sends a message to TopIC1 and the consumer listens for the TopIC1 consumption message. The listener is annotated with @kafkalistener. Topics are topics that can be monitored at the same time. To start the project, the Postman call interface triggers the producer to send a message,
3. Bring back the producer of the call function
KafkaTemplate provides a callback method, addCallback, that can be used to compensate for whether a message was sent successfully or failed.
The first:
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// The topic to which the message is sent
String topic = success.getRecordMetadata().topic();
// The partition to which the message is sent
int partition = success.getRecordMetadata().partition();
// The offset of the message in the partition
long offset = success.getRecordMetadata().offset();
System.out.println("Message sent successfully :" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("Failed to send message :" + failure.getMessage());
});
}
Copy the code
The second:
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("Failed to send message:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("Message sent successfully:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-"+ result.getRecordMetadata().offset()); }}); }Copy the code
4. Customize partitions
Every topic in Kafka is divided into partitions. When a producer sends a message to a topic, which partition does it append to? This is the partitioning policy. Kafka provides us with a default partitioning policy, but it also supports custom partitioning policies. Its routing mechanism is as follows:
(1) If a partition is specified (that is, a user-defined partition policy), the message is appended to the specified partition.
(2) If a message is sent without a patition but with a key (Kafka allows you to set a key for each message), the key value will be hashed and routed to the specified partition. In this case, all messages with the same key can be guaranteed to enter the same partition.
(3) If neither partition nor key is specified, use the default kafka partition policy and poll to select a partition.
We’ll create a new partition class to implement the Partitioner interface and override the method whose return value indicates the partition to which the message will be sent.
package com.example.kafka.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// Create a custom partition rule (assuming all partitions are sent to partition 0)
return 0;
}
@Override
public void close(a) {}@Override
public void configure(Map
map)
,> {}}Copy the code
Configure the custom partition in application.propertise. The configured value is the full pathname of the partition class.
# Custom partition
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
Copy the code
5. Kafka transaction commit
If you need to create a transaction while sending a message, you can declare the transaction using the executeInTransaction method of KafkaTemplate,
@GetMapping("/kafka/transaction")
public void sendMessageTransaction(a){
// Life transaction, the following error message will not be sent
kafkaTemplate.executeInTransaction(operations ->{
operations.send("topic"."test executeInTransaction");
throw new RuntimeException("fail");
});
// Do not declare the transaction, save later but the front-end message has been sent successfully
kafkaTemplate.send("topic"."test executeInTransaction");
throw new RuntimeException("fail");
}
Copy the code
6. Consumers
Specify topic, partition, offset consumption
Topic1 = topic1; topic1 = topic1; topic1 = topic1; Also very simple, @kafkalistener annotations are all provided for us.
/ * * *@TitleSpecify topic, partition, offset to consume *@DescriptionListen on topic1 and topic2, listen on topic1 and topic2, listen on topic1 and topic2, listen on topic1 and topic2, listen on topic1 and topic2@param record
*/
@KafkaListener(id="consumer1",groupId = "felix-group",topicPartitions = { @TopicPartition(topic = "topic1",partitions = {"0"}), @TopicPartition(topic = "topic2",partitions = "0", partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "8")) })
public void onMessage2(ConsumerRecord
record){
System.out.println("topic:"+record.topic()+"partition:"+record.partition()+"offset:"+record.offset()+"value:"+record.value());
}
Copy the code
Attribute Explanation:
① ID: consumer ID;
② groupId: consumer groupId;
③ Topics: monitoring topic, can monitor more;
④ topicPartitions: more detailed monitoring information can be configured, and topic, PARition, offset monitoring can be specified.
OnMessage2: monitor topic1 (0), and monitor topic2 (0) and topic2 (1) for messages whose offset starts from 8.
Note: Topics and topicPartitions cannot be used together;
7. Buy in bulk
Set application.prpertise to enable bulk consumption,
# Set bulk consumption
spring.kafka.listener.type=batch
The maximum number of messages consumed at a time
spring.kafka.consumer.max-poll-records=50
Copy the code
A List is used to receive messages, and the listening code is as follows:
@KafkaListener(id="consumer2",groupId = "felix-group",topics = "topic1" )
public void onMesssage(List
> records)
>{
System.out.println(">>> Batch consume once, records.size()="+records.size());
for(ConsumerRecord<? ,? > record:records){ System.out.println(record.value()); }}Copy the code
8. ConsumerAwareListenerErrorHandler exception handler
With the exception handler, we can handle exceptions that occur when a consumer is consuming.
A new ConsumerAwareListenerErrorHandler type of exception handling method, using the @ Bean injection, BeanName default is the method name, Then we put the BeanName of the exception handler in the errorHandler property of the @kafKalistener annotation. When we listen for an exception to be thrown, the exception handler will be called automatically.
// Exception handling
// Create a new exception handler with @bean injection
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(a){
return (message,exception,consumer)->{
System.out.println("Abnormal consumption:"+message.getPayload());
return null;
};
}
// Place the BeanName of the exception handler in the errorHandler property of the @kafkalistener annotation
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord
record) throws Exception{
throw new Exception("Simple consumption - Simulated anomalies.");
}
// The exception handler's message.getpayload () can also fetch information about each message
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List
> records)
> throws Exception{
System.out.println("Buy in bulk once...");
throw new Exception("Bulk consumption - Simulated exception");
}
Copy the code
9. Message filters
A message filter can be blocked before the message reaches the consumer. In practice, we can filter out the message that we need and send it to KafkaListener according to our business logic, and filter out the message that we don’t need.
To configure message filtering, you only need to configure a RecordFilterStrategy for the listener factory. If true, messages will be discarded, and if false, messages will normally arrive in the listener container. @Component public class KafkaConsumer { @Autowired ConsumerFactory consumerFactory;
package com.example.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;
// Message filter
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(a){
ConcurrentKafkaListenerContainerFactory factory=new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// The filtered message will be discarded
factory.setAckDiscarded(true);
// Message filtering policy
factory.setRecordFilterStrategy(consumerRecord -> {
if(Integer.parseInt(consumerRecord.value().toString())%2= =0) {return false;
}
// Messages that return true are filtered
return true;
});
return factory;
}
// Message filtering listener
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord
record){ System.out.println(record.value()); }}Copy the code
Topic1 receives 100 messages from 0 to 99. The listener consumes only even numbers. Topic1 consumes only even numbers.
10. Message forwarding
In actual development, we may have such requirements: Application A obtains the message from TopicA, forwards it to TopicB after processing, and then application B listens to process the message, that is, after one application completes processing, the message is forwarded to other applications to complete message forwarding.
It is also easy to integrate Kafka with SpringBoot to implement message forwarding with a @sendto annotation. The return value of the annotated method is the content of the forwarded message, as follows
/ * * *@TitleMessage forwarding@DescriptionMessages received from TOPIC1 are processed and forwarded to Topic2 *@param record
* @return* /
@KafkaListener(topics = {"topic"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord
record){
return record.value()+"-forward message";
}
Copy the code
11. Start and stop the listener periodically
By default, the listener starts when the consumer project starts and listens for the messages that the consumer sends to the given topic, so if we don’t want the listener to work immediately, we want it to start working at the point we specify, or we want it to stop working at the point we specify, How to deal with – use KafkaListenerEndpointRegistry, let’s to achieve:
① Forbid the listener to start automatically;
② Create two scheduled tasks. One is used to start the timer at a specified point in time, and the other is used to stop the timer at a specified point in time.
Create a new task class regularly, with annotations @ EnableScheduling statement, KafkaListenerEndpointRegistry in SpringIO has already been registered for Bean, direct injection, set to ban KafkaListener since the launch,
@EnableScheduling
@Component
public class CronTimer {
/ * * *@KafkaListenerNote the labeling method will not be registered in the IOC container for Bean, * but will be registered in KafkaListenerEndpointRegistry, * and KafkaListenerEndpointRegistry has already been registered for Bean in SpringIOC * * /
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
// Listener container factory (set to disable KafkaListener from starting)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory(a) {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
// Disable KafkaListener from starting
container.setAutoStartup(false);
return container;
}
/ / listeners
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord
record){
System.out.println("Consumption success:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
// Start the listener periodically
@Scheduled(cron = "0 42 11 * * ? ")
public void startListener(a) {
System.out.println("Start the listener...");
// "timingConsumer" is the listener ID set after the @kafkalistener annotation to identify the listener
if(! registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}
// Timer stop listener
@Scheduled(cron = "0 45 11 * * ? ")
public void shutDownListener(a) {
System.out.println("Close the listener...");
registry.getListenerContainer("timingConsumer").pause(); }}Copy the code
Start the project and trigger the producer to send a message to Topic1. You can see that the consumer is not consuming because the listener is not working yet.