The fastest shortcut in the world is to keep your feet on the ground. Focus on the sharing place.

Open Source projects:

  • Distributed Monitoring (Gitee GVP’s Most valuable Open Source project) : gitee.com/sanjianketh…
  • Personal website: www.jiagoujishu.com

Spring Boot series:

Spring Boot series (1) : Start the SpringApplication

Spring Boot series (2) Configuration feature analysis

Spring Boot series (3) : the latest version of elegant shutdown details

Spring Boot series (4) : Log dynamic configuration details

0, a brief introduction

Spring Boot version: 2.3.4.RELEASE

With the development of big data, Kafka is being used more and more in our projects. Its high performance features also satisfy most of our scenarios, so it is important to learn the compatible use of Kafka.

Here are a few points:

  • Send a message
  • Send the callback
  • Realize the principle of
  • Asynchrony and synchronization

1. Add dependencies

 <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
 </dependency>
Copy the code

2. Add the configuration

In Spring Boot, kafka configuration attributes start with spring.kafka.*.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Copy the code

3. Send the code

Spring Boot kafka still uses the old XXXTemplate, so here is a natural use of KafkaTemplate

@Component
public class Producer {

   private static final Logger logger = LoggerFactory.getLogger(Producer.class);

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String msg) {
         kafkaTemplate.send("testTOPIC", msg); }}Copy the code

4. Parameter Description

ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<? > message);Copy the code

Kafkatemplate. send has a number of different parameters to send, as described below:

  • Topic: Enter the topic name to be sent

  • Partition: indicates the ID of the partition to be sent. The value starts from 0

  • Timestamp: indicates the timestamp

  • Key: key of the message

  • Data: message data

  • ProducerRecord: Encapsulation class for the message, containing the above parameters

  • Message<? < span style = “max-width: 100%; clear: both; min-height: 1em

5. Send callback

Spring Boot KafkaAutoConfiguration provides us with a handler to handle message callbacks so that we can process the results. Call onSuccess onSuccess, onError on failure, add the following class:

@Component
public class KafkaSendResultHandler implements ProducerListener {

    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send error : "+ producerRecord.toString()); }}Copy the code

6. Implementation principle

From the above, we basically send kafka messages in two or three lines of code, so how do they actually load the implementation?

Spring-boot-autoconfigure: KafkaAutoConfiguration: KafkaAutoConfiguration: KafkaAutoConfiguration: KafkaAutoConfiguration: KafkaAutoConfiguration: KafkaAutoConfiguration: KafkaAutoConfiguration: KafkaAutoConfiguration

1, KafkaAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {}Copy the code

Several things can be seen from KafkaAutoConfiguration

ConditionalOnClass(kafkatemplate.class) ConditionalOnClass(kafkatemplate.class

This class is configured with the KafkaProperties property for our use

3, here to help we introduces two classes when loading KafkaAnnotationDrivenConfiguration provide consumer annotation support and KafkaStreamsAnnotationDrivenConfiguration stream annotation support

2, KafkaTemplate

Instead of Posting a lot of code here, you can look at the complete KafkaAutoConfiguration.

  @Bean
	@ConditionalOnMissingBean(KafkaTemplate.class)
	publicKafkaTemplate<? ,? > kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { KafkaTemplate<Object, Object> kafkaTemplate =new KafkaTemplate<>(kafkaProducerFactory);
		messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
		kafkaTemplate.setProducerListener(kafkaProducerListener);
		kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
		return kafkaTemplate;
	}
	
  @Bean
	@ConditionalOnMissingBean(ProducerListener.class)
	public ProducerListener<Object, Object> kafkaProducerListener(a) {
		return new LoggingProducerListener<>();
	}

	
	@Bean
	@ConditionalOnMissingBean(ProducerFactory.class)
	publicProducerFactory<? ,? > kafkaProducerFactory() {return factory;
	}
Copy the code

The main methods related to KafkaTemplate are extracted here. The key points are as follows:

  • All three methods use the @conditionalonmissingBean annotation, and the underlying reason for this is to make it easier to extend
  • The kafkaProducerListener method is used to build the Callback when the doSend method is called, so that we can monitor the success or failure of sending messages.
  • The ProducerFactory is actually used to create the producer. If the Producer is configured with transactionIdPrefix, then the producer supports the object. If an item is started, the producer is first fetched from the local ThreadLocal and then created. (KafkaTemplate, line 341, getTheProducer)
  • Here if you configure itspring.kafka.producer.transaction-id-prefixI’m also going to create oneKafkaTransactionManagerTransaction manager

Loading process:

1. Since we added the Spring-Kafka JAR, KafkaAutoConfiguration will be loaded via the SPI mechanism at startup

When @conditionAlonmissingBean () finds that KafkaTemplate is not configured independently, the ProducerListener and ProducerFactory will be loaded in order to build the KafkaTemplate

Sending process:

Kafkatemplate. send(String topic, @nullable V data

KafkaTemplate internal doSend(ProducerRecord

ProducerRecord
,>

3. Call the getTheProducer() method inside KafkaTemplate. If the Producer is sent, get it from Threadlocal, otherwise create a Producer

Construct the SettableListenableFuture callback

DoSend (ProducerRecord

record, Callback Callback)
,>

7. Asynchronous and synchronous sending

The source code for KafkaTemplate shows that messages are sent asynchronously. KafkaTemplate will wrap the parameter we passed into ProducerRecord and call the doSend method.

public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }
Copy the code

The doSend(producerRecord) method checks to see if the producer is started. It calls this.gettheproducer () to getTheProducer, and then constructs a SettableListenableFuture callback. Kafkaproducer.send (ProducerRecord

record, Callback Callback)
,>

protected ListenableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        }

        Producer<K, V> producer = this.getTheProducer();
        this.logger.trace(() -> {
            return "Sending: " + producerRecord;
        });
        SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
        producer.send(producerRecord, this.buildCallback(producerRecord, producer, future));
        if (this.autoFlush) {
            this.flush();
        }

        this.logger.trace(() -> {
            return "Sent: " + producerRecord;
        });
        return future;
    }
Copy the code

Synchronous message sending

Since there are some business scenarios where I need to send messages synchronously, the implementation is actually quite simple. Since a Future

is returned, all we need to do is call the get method.

public void syncSend(a) throws ExecutionException, InterruptedException {
        kafkaTemplate.send("demo"."test sync message").get();
    }
Copy the code

The fastest shortcut in the world is to keep your feet on the ground. Focus on the sharing place.