background
Kafka is a distributed, partition, copy of the message system, in what is now the Internet companies, widely used, in our company in the main application on time delivery services, batch data processing, log upload, etc., I found that most blogs, online is still in use to the client, provided by the Apache official to use development, Here recommend the use of Spring for Apache Kafka (referred to as Spring-Kafka), update maintenance stability, method many, and powerful, has been added to the Spring luxury package.
Hole warning
Spring – kafka is actually the apache kafka – the client for the packaging and development, so use must pay attention to, when you introduce the spring – encapsulated in kafka kafka – the client and server version to kafka server version to correspond, Otherwise, there will be problems, such as consumption failure. Spring for Apache Kafka (kafka server) Spring for Apache Kafka (Kafka server) Spring for Apache Kafka (Kafka server
Producer’s use
Spring-kafka provides two template classes for producers, namely
- KafkaTemplate: Wraps a producer and provides a convenient way to send data to Kafka topics.
- ReplyingKafkaTemplate: a subclass of KafkaTemplate, which adds a request/reply function that returns a future after sending data, encapsulating the consumer’s response. KafkaTemplate is mainly introduced here. For use, officials offer such a case
To use the template, configure a producer factory and provide it in the template’s constructor:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}Copy the code
— Spring for Apache Kafka
The official documentation only shows how to configure and construct a template from the factory. What we need is a complete case that can be used in the project. Here I present a complete producer case integrated with Spring Boot.
Start by constructing a template class
@Configuration public class TestTemplateConfig{ public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); / / kafka. Metadata. Broker. List = 10.16.0.214:9092,10.16. 0.215:9092,10.16. 0.216:9092 Props. The put (ProducerConfig BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG,"-1"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,5048576); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @bean (name="testTemplate") public KafkaTemplate<String, String> testTemplate() { return new KafkaTemplate<>(producerFactory()); }}Copy the code
The @Configuration annotation defines the Configuration class, internally constructs the KafkaTemplate with the KafkaProducerFactory provided by Spring-Kafka, and registers it with the @Bean annotation on the method. The KafkaTemplate is injected into the container for instantiation.
Send a message
@Resource private KafkaTemplate testTemplate; Public void syncSend(){testTemplate.send("topic",result.toString()).get(10, timeUnit.seconds); } public void asyncSend() {ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString()); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onSuccess(SendResult<Integer, String> result) { System.out.println("success"); } @Override public void onFailure(Throwable ex) { System.out.println("failure"); }}); }Copy the code
consumers
For consumer use, Spring-Kafka offers two ways
Messages can be received by configuring a MessageListenerContainer and providing a Message Listener, Or by using the @kafkalistener annotation. –
Spring for Apache Kafka
That is, by configuring MessageListenerContainer, you can provide a message listener or use the @kafKalistener annotation to receive messages. The official MessageListener is actually an interface. There are eight interfaces to meet different application scenarios. Here we use the most common MessageListener interface
public class KafkaConsumerSerivceImpl implements MessageListener<String, String> { @Override public void onMessage(ConsumerRecord<String, Equals (data.topic())){}else if(" topic 2".equals(data.topic())){}}}Copy the code
The other is very convenient with the @kafkalistener annotation
@Component public class KafkaConsumer { @KafkaListener(topics = {"testTopic"}) public void receive(String message){ System.out.println(" consume message :" + message); }}Copy the code
Either way, you need to create a MessageListenerContainer. The official case for the construction of this container is more detailed.
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); . return props; }}Copy the code
Construction process
Notice that to set container properties, you must use the getContainerProperties() method on the factory. It is used as a template for the actual properties Injected into the container
Spring for Apache Kafka
The only way to set listening container properties, such as polling time, is to use the getContainerProperties().set() method.
The key configuration
Producer allocation
Bootstrap. servers: brokers Cluster address acks:
- -1: The producer sends data only after all follower copies confirm that they have received the data
- 0: The producer does not wait for confirmation from the broker that the synchronization has completed
- 1: Producer sends the next data after the leader receives the data and confirms it
Batch. size: When multiple records are sent to the same partition, a producer tries to combine the records into fewer requests linger. Ms: Maximum buffer. Max-request. size: indicates the maximum size of the sent data
Consumer configuration
Concurrency: concurrency for a consumer listener container session.timeout.ms: a timeout to detect a consumer failure auto-.offset.
- Earliest: When there is a submitted offset under each partition, the money will be consumed from the submitted offset; If there is no submitted offset, the consumption starts from scratch
- Latest: Consumes the submitted offset when each partition has an offset. If there is no committed offset, the newly generated data under the partition is consumed
- None: Consumption starts after offset when all partitions in topic have committed offsets. An exception is thrown whenever a partition does not have a committed offset
Enable.auto.com MIT:
- True: Automatic commit shift
- False: Disables the automatic submission shift and manually submits the shift after the message is processed. Auto.mit.interval. ms: If enable.auto.mit =true, set the automatic submission period
Remark:
For scenarios with high data reliability, you are advised to manually submit offset and set acks to “all”. That is, send returns only when all copies are synchronized to data. In this way, you can determine whether data is successfully sent.