Xiaobaiai.net xiaobaiai.net Xiaobaiai.net xiaobaiai.net

[TOC]

1 introduction

This article is very comprehensive, very long, very detailed! Don’t be impatient, take your time! I have written, I believe you can read it, there are any problems can be exchanged at any time!

This article is very comprehensive, very long, very detailed! Don’t be impatient, take your time! I have written, I believe you can read it, there are any problems can be exchanged at any time!

This article is very comprehensive, very long, very detailed! Don’t be impatient, take your time! I have written, I believe you can read it, there are any problems can be exchanged at any time!

This article mainly introduces Spring Kafka common configuration, topic automatic creation, publish messages to the cluster, subscribe to the message (group), flow processing configuration and embedded Kafka do test configuration related content, finally through two ways to achieve the message publish and subscribe function. One is based on the Spring Integration approach. Spring Boot Kafka is an Apache Kafka client (producer/consumer/stream processing, etc.). Kafka is an Apache Kafka client (producer/consumer/stream processing, etc.). The spring-kafka project provides automatic configuration of Apache Kafka by simplifying the configuration of Spring Boot (prefixed with spring.kafka.*). Using Kafka in Spring Boot is particularly simple. Spring Boot also provides an embedded Kafka agent for easy testing.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Copy the code

To implement the following functions involved, the following environment is required:

  • Java Runtime or development environment (JRE/JDK)
  • Kafka is installed successfully

For more configurations, see the article Kafka,ZK Cluster Development or Deployment Environment Setup and Experiment.

This article tries to explain the logic is clear, the main route is the global introduction of Spring Kafka’s main functions and key configuration, and Spring Boot to Spring Kafka further simplify the configuration, through Spring Boot Kafka several big notes to achieve publishing and subscription functions, At the same time through Spring Integration + custom Kafka configuration to implement a more complex Kafka publish and subscribe function, this article through their own experiments and sorting out a long time, covering most of the content of Spring Kafka, I hope you read down patiently, any questions feedback at any time, Study together.

2 Spring Kafka features Overview

Spring Kafka, Spring Integration, and Kafka client versions are linked or compatible as follows (as of December 9, 2019) :

Spring for Apache Kafka Spring Integration for Apache Kafka Version kafka-clients
X 2.3. X 3.2. 2.3.1
X 2.2. X 3.1. 2.0.1, 2.1.x, 2.2.x
X 2.1. X 3.0. 1.0.x, 1.1.x, 2.0.0
X 1.3. X 2.3. 0.11.0. X, 1.0 x

Spring Kafka is currently available in version 2.3.4.

Spring Kafka has the following annotations:

Annotation type describe
EnableKafka Enabled byAbstractListenerContainerFactoryKafka listener annotation endpoint created under covers for configuring classes
EnableKafkaStreams Enable the default Kafka stream component
KafkaHandler In a class annotated with KafkaListener, an annotation that marks a method as the target of a Kafka message listener
KafkaListener Marks the method as an annotation that specifies the target of the Kafka message listener on the topic
KafkaListeners Container annotations that aggregate multiple KafkaListener annotations
PartitionOffset Add partition/initial offset information to KafkaListener
TopicPartition Use to add topic/partition information to KafkaListener

Such as using the @ EnableKafka can listen AbstractListenerContainerFactory subclass the target endpoint, Such as ConcurrentKafkaListenerContainerFactory is AbstractKafkaListenerContainerFactory subclasses.

public class ConcurrentKafkaListenerContainerFactory<K.V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K.V>,K.V>
Copy the code
@Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory(a) {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }
Copy the code

@enablekafka is not required to EnableKafka in Spring Boot. Spring Boot comes with automatic configuration of Spring Kafka, so explicit @enablekafka is not required. If you want to implement the Kafka configuration class yourself, add @enableKafka. If you don’t want Kafka to be configured automatically, for example in tests, all you need to do is remove KafkaAutoConfiguration:

@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
Copy the code

2.1 Automatically Creating a Theme

💡 To create topics at application startup, you can add beans of type NewTopic. If the topic already exists, the beans are ignored.

2.2 Sending Messages

Spring’s KafkaTemplate is automatically configured, and you can automatically attach it directly to your Bean, as shown in the following example:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}
Copy the code

The KafkaTemplate wraps a producer and provides a convenient way to send data to a Kafka topic. KafkaTemplate provides the following interface: KafkaTemplate provides the following interface: KafkaTemplate provides the following interface: KafkaTemplate provides the following interface: KafkaTemplate provides the following interface:

ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<? > message); Map<MetricName, ? extends Metric> metrics();List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush(a);
interface ProducerCallback<K.V.T> {
    T doInKafka(Producer<K, V> producer);
}
Copy the code

The sendDefault API requires that a default theme be provided to the template. Some apis accept a timestamp as an argument and store the timestamp in a record. How the user-supplied timestamp is stored depends on the timestamp type configured on the Kafka topic. If the topic is configured to use CREATE_TIME, the user-specified timestamp is recorded (or generated if not specified). If the subject is configured to use LOG_APPEND_TIME, the user-specified timestamp is ignored and the agent adds the local agent time. The metrics and partitionsFor methods delegate to the same methods on the underlying Producer. The execute method provides direct access to the underlying producer

To use a template, configure a producer factory and provide it in the template’s constructor. The following example demonstrates how to do this:

@Bean
public ProducerFactory<Integer, String> producerFactory(a) {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs(a) {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(a) {
    // Input producer factory configuration in the KafkaTemplate constructor
    return new KafkaTemplate<Integer, String>(producerFactory());
}
Copy the code

Then, to use the template, you can call one of its methods to send a message.

When you use the Message
, subject, partition, and key information is provided in the message header, with the following subitems:

KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP
Copy the code

For example, to access an item in the header information:

public void handleMessage(Message
        message) throws MessagingException {
    LOGGER.debug("===Received Msg Topic: {}",  message.getHeaders().get(KafkaHeaders.TOPIC));
}
Copy the code

Optionally, KafkaTemplate can be configured using ProducerListener to get an asynchronous callback with a sent result (success or failure), rather than waiting for it to complete in the future. The following list shows the ProducerListener interface definition:

public interface ProducerListener<K.V> {
    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
    void onError(String topic, Integer partition, K key, V value, Exception exception);
    boolean isInterestedInSuccess(a);
}
Copy the code

By default, the template is configured with LoggingProducerListener, which logs only errors and does nothing when a send is successful. OnSuccess is called only if isInterestedInSuccess returns true. For convenience, if you only want to implement one of these methods, the abstract ProducerListenerAdapter will be provided. For isInterestedInSuccess, it returns false. The asynchronous result callback is shown below:

public void sendMessage(String msg) {
    LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
    ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            LOGGER.info("===Producing message success");  
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.info("===Producing message failed"); }}); }Copy the code

If you want to prevent the sending thread from waiting for the result, you can call the Future’s get() method. You may want to call flush() before waiting, or for convenience, the template has a constructor with an autoFlush argument that causes the template to flush() every time it is sent. Note, however, that the refresh can significantly degrade performance:

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch(TimeoutException | InterruptedException e) { handleFailure(data, record, e); }}Copy the code

Using DefaultKafkaProducerFactory:

ProducerFactory is used to create producers, as shown above using KafkaTemplate. By default, when the transaction is not used for all clients use DefaultKafkaProducerFactory will create a singleton producers, such as KafkaProducer javadocs are suggested. However, if flush() is called on the template, this can cause delays for other threads using the same producer. Starting from version 2.3, there is a new attribute producerPerThread DefaultKafkaProducerFactory. When set to true, the factory creates (and caches) a separate producer for each thread to avoid this problem.

When producerPerThread is true, user code must call closeThreadBoundProducer() on the factory when the producer is no longer needed. This will actually close the producer and remove it from ThreadLocal. Calls to reset() or destroy() do not clean up these producers.

Create DefaultKafkaProducerFactory, by calling the only accept attribute mapping constructor (see using the example of KafkaTemplate) obtained from the configuration keys and/or values serializer class, Or serializer instance can be passed to DefaultKafkaProducerFactory constructor producer (in this case, all share the same instance). Alternatively, you can provide Supplier

s (starting with version 2.3) to get a separate Serializer instance for each producer:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
Copy the code

Using ReplyingKafkaTemplate:

Version 2.1.3 introduced a subclass of KafkaTemplate to provide request/reply semantics. This class is called ReplyingKafkaTemplate and has a method (in addition to those in the superclass). The following list shows the method signature:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);
Copy the code

The result is a ListenableFuture, which is populated asynchronously by the result (or with an exception when timed out). The result also has a sendFuture attribute, which is the result of calling kafkatemplate.send (). You can use this Future to determine the result of the send operation. I’m not going to expand it here.

2.3 Receiving Messages

Messages can be received by configuring MessageListenerContainer and providing a message listener or by using the @kafKalistener annotation.

2.3.1 Message listeners

When using a Message Listener Container, listeners must be provided to receive data. There are currently eight interfaces supported by message listeners. The following list shows these interfaces:

// Use this interface to process a single ConsumerRecord instance received from a Kafka consumer poll() when using one of the automatic commit or container-managed commit methods
public interface MessageListener<K.V> {
    void onMessage(ConsumerRecord<K, V> data);
}

// When using one of the manual commit methods, use this interface to process a single ConsumerRecord instance received from the Kafka consumer poll() operation
public interface AcknowledgingMessageListener<K.V> {
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

// Use this interface to process a single ConsumerRecord instance received from the Kafka consumer poll() operation when using one of the automatic commit or container-managed commit methods. Provides access to consumer objects.
public interface ConsumerAwareMessageListener<K.V> extends MessageListener<K.V> { 
    void onMessage(ConsumerRecord
       
         data, Consumer
         consumer)
       ,>;
}

// When using one of the manual commit methods, use this interface to process a single ConsumerRecord instance received from the Kafka consumer poll() operation. Provides access to consumer objects.
public interface AcknowledgingConsumerAwareMessageListener<K.V> extends MessageListener<K.V> { 
    void onMessage(ConsumerRecord
       
         data, Acknowledgment acknowledgment, Consumer
         consumer)
       ,>;
}

// Use this interface to process all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the automatic commit or container-managed commit methods. Ackmode. RECORD is not supported when using this interface because the listener has been fully batched.
public interface BatchMessageListener<K.V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

// Use this interface to process all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods.
public interface BatchAcknowledgingMessageListener<K.V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

// Use this interface to process all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the automatic commit or container-managed commit methods. Ackmode. RECORD is not supported when using this interface because the listener has been fully batched. Provides access to consumer objects.
public interface BatchConsumerAwareMessageListener<K.V> extends BatchMessageListener<K.V> { 
    void onMessage(List
       
        > data, Consumer
         consumer)
       ;
}

// Use this interface to process all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods. Provides access to consumer objects.
public interface BatchAcknowledgingConsumerAwareMessageListener<K.V> extends BatchMessageListener<K.V> { 
    void onMessage(List
       
        > data, Acknowledgment acknowledgment, Consumer
         consumer)
       ;
}
Copy the code

The above consumer object is not thread-safe. You can only call a listener’s method on the thread that calls it.

2.3.1.1 Message listener container

Two MessageListenerContainer implementations are provided:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer from all of topics or partition on a single thread receives all messages (or a partition can only be assigned to a consumer, a consumer can be assigned multiple partitions). ConcurrentMessageListenerContainer entrusted to one or more KafkaMessageListenerContainer instance, to provide a multi-threaded use, went up from multithreaded handles all the message theme or partition.

Starting with Spring Kafka2.2.7, you can add the RecordInterceptor to your listener container; The listener is called before it is called to allow the record to be examined or modified. If the interceptor returns NULL, the listener is not invoked. Listeners are not called when they are batch listeners. Starting from version 2.3, CompositeRecordInterceptor can be used to invoke multiple interceptors.

By default, when using a transaction, the listener is invoked after the transaction is started. Starting with version 2.3.4, you can set the interceptBeforeTx property of the listener container to invoke the listener before the transaction starts. No listeners are provided for batch listeners because Kafka already provides ConsumerInterceptor.

2.3.1.2 use KafkaMessageListenerContainer

The following constructors are available:

public KafkaMessageListenerContainer(ConsumerFactory
       
         consumerFactory, ContainerProperties containerProperties)
       ,>
public KafkaMessageListenerContainer(ConsumerFactory
       
         consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset... topicPartitions)
       ,>
Copy the code

Each gets a ConsumerFactory and information about themes and partitions, as well as other configurations in the ContainerProperties object. ConcurrentMessageListenerContainer (later) using the second constructor across distributed TopicPartitionOffset consumer instance. ContainerProperties has the following constructors:

public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
Copy the code

The first constructor accepts an array of TopicPartitionOffset arguments to explicitly indicate which partitions the container wants to use (using the consumer’s Assign () method) and an optional initial offset. By default, positive values are absolute offsets. By default, negative values are the current last offset relative to the partition. Provides a constructor for TopicPartitionOffset, which takes an additional Boolean argument. If true, the initial offset (positive offset or negative offset) is relative to the current position of the consumer. The container starts with an offset applied. The second is a topic array. Kafka allocates partitions based on the group.id property: partitions are distributed among groups. The third uses a regex expression to select topics.

To assign a MessageListener container, and can be used when creating the container ContainerProps. SetMessageListener method. The following example demonstrates how to do this:

ContainerProperties containerProps = new ContainerProperties("topic1"."topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
Copy the code

Note that when creating a Defaultkafkafkaconsumerfactory, use a constructor, the constructor only on the basis of its features, means is getting the key/value from the configuration of the Deserializer category. Or, deserialization process instances can be passed to the key/value DefaultKafkaConsumerFactory constructor, in this case, all consumers share the same instance. Another option is to provide Supplier

s (starting with version 2.3) to get a separate Deserializer instance for each consumer:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () - >new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
Copy the code

For more information about the various properties that can be set, see ContainerProperties in Javadoc.

As of version Spring Kafka 2.1.1, a new property called logContainerConfig is available. When true and INFO logging are enabled, each listener container writes a log message summarizing its configuration properties.

For example, to change the log level for the INFO, you can use containerProperties. SetCommitLogLevel (LogIfLevelEnabled. Level. The INFO).

As of Spring Kafka 2.2, a new container property named missingTopicsFailal has been added (default: true). This prevents the container from starting if there are no topics on the broker that the client publishes or subscribes to. This does not apply if the container is configured to listen topic mode (REgex). Previously, the container thread looped through the consumer.poll() method, waiting for a topic to appear as many messages were logged. Other than the logs, there was no indication of a problem. To restore the previous behavior, set the property to false. The Broker setting allow.auto. Create. Topics =true and the container property is false will automatically create a topic that does not exist.

2.3.1.3 use ConcurrentMessageListenerContainer

The individual constructor is similar to the first KafkaListenerContainer constructor. The following list shows the signature of the constructor:

public ConcurrentMessageListenerContainer(ConsumerFactory
       
         consumerFactory, ContainerProperties containerProperties)
       ,>
Copy the code

It also has a concurrency property. . For example, the container setConcurrency means (3) to create three KafkaMessageListenerContainer instance. For the first constructor, Kafka uses its group management functionality to distribute partitions among consumers.

When listening on multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each, and you want to use Concurrency =15, you only see five active consumers, with each consumer assigned one partition from each topic and the other ten consumers in an idle state. This is because the default Kafka PartitionAssignor is RangeAssignor (see its Javadoc). In this case, you might want to consider using RoundRobinAssignor, which distributes partitions to all users. Then, each consumer is assigned a topic or partition. To change PartitionAssignor, You can offer DefaultKafkaConsumerFactory attributes set in the partition. The assignment. The strategy consumer configuration parameter (ConsumerConfigs PARTITION_ASSIGNMENT_STRATE GY_CONFIG).

When using Spring Boot, setting policies can be assigned as follows:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
Copy the code

For the second constructor, ConcurrentMessageListenerContainer TopicPartition instance distribution on the entrusted KafkaMessageListenerContainer instance.

For example, if six TopicPartition instances are provided, the concurrency is 3; Each container gets two partitions. For the five TopicPartition instances, two containers get two partitions, and the third container gets one partition. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down so that each container gets one partition. You can use the command line tool kafka-topics. Sh to query and adjust the number of partitions on a topic. You can also add a NewTopic Bean, and if the number of NewTopic Settings is greater than the current number, Spring Boot’s auto-configured KafkaAdmin will adjust the partition upward.

The client.id attribute (if set) append -n, where n is the consumer instance corresponding to concurrency. This is required to provide a unique name for MBeans when JMX is enabled.

Starting with version Spring Kafka 1.3, MessageListenerContainer provides access to the metrics of the underlying KafkaConsumer. For ConcurrentMessageListenerContainer metrics () method returns all the target KafkaMessageListenerContainer instance metrics (metrics). Are grouped into Map

.
,>

Starting with version 2.3, ContainerProperties provides an idleBetweenPolls option that allows the main loop in a listener container to sleep between calls to kafkaconsumer.poll (). Select the actual sleep interval as the minimum from the options provided, and select the difference between the max.poll.interval.ms consumer configuration and the current recorded batch time.

2.3.1.4 Submitting an Offset

There are several options for submitting offsets. If the enable.auto.mit consumer attribute is true, Kafka will automatically commit offsets based on its configuration. If false, the container supports multiple AckMode Settings (described in the next list). The default validation mode is batch. Starting with version 2.3, the framework sets enable.auto.mit to false unless it is explicitly set in the configuration. Previously, the Kafka default (true) was used if no properties were set. The consumer poll() method returns one or more ConsumerRecords. Call MessageListener for each record. The following list describes the actions taken by the container on each AckMode:

  • RECORD: The offset is committed when the listener returns after processing the RECORD.
  • BATCH: the processedpoll()All records returned after committing offsets.
  • TIME: After processingpoll()All records returned after the commit offset, as long as more than the last commit sinceackTime
  • COUNT: Processing is completepoll()All records returned after submission offset, as long as the last submission received sinceackCountRecord.
  • COUNT_TIME: similar toTIMEandCOUNT, but if both conditions are true, the commit is performed.
  • MANUAL: Message listeners are responsibleacknowledge()andAcknowledgment. After that, the same semantics as BATCH are applied.
  • MANUAL_IMMEDIATE: listener callAcknowledgement.acknowledge()Method commits the offset immediately.

MANUAL and the listener is AcknowledgingMessageListener or BatchAcknowledgingMessageListener MANUAL_IMMEDIATE requirements. See message listeners.

Based on the syncCommits container attribute, use the commitSync() or commitAsync() methods on the consumer. By default, syncCommits are true; Also see setSyncCommitTimeout. See setCommitCallback for the result of an asynchronous commit; The default callback is LoggingCommitCallback, which logs errors (and debug level success).

Because the listener container has its own mechanism for committing offsets, it expects Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to be false. Starting with version 2.3, it will unconditionally set it to false unless specifically set in a consumer factory or container’s consumer property rewrite.

This should be Acknowledgment in the following ways:

public interface Acknowledgment {
    void acknowledge(a);
}
Copy the code

This method allows the listener to control when the offset is committed.

As of version 2.3, verify that the interface has two additional methods, NACK (Long sleep) and NACK (Int index, long sleep). The first is for logging listeners and the second is for batch listeners. Calling the wrong method for the listener type raises an IllegalStateException.

Nack () can only be called on the consumer thread calling the listener.

When batch listeners are used, indexes can be specified within the failed batch. When nack() is called, the indexes are performed on the partitions of the failed and discarded records and the offsets of the previously committed records are looked up so that they can be repassed on the next poll(). This is the improvement of SeekToCurrentBatchErrorHandler, SeekToCurrentBatchErrorHandler can only find the whole batch to deliver again.

Note: When using partition allocation through group management, it is important to ensure that the sleep parameter (plus the time taken to process the last poll record) is less than the consumer max.poll.interval.ms property.

2.3.1.5 Listener container starts automatically and manually

The listener container implements SmartLifecycle (after Spring loads and initializes all beans, then performs some tasks or starts any asynchronous services needed by SmartLifecycle) and autoStartup is true by default. The container is started late (integer.max-value – 100). Other components that implement SmartLifecycle to process data from listeners should start at an earlier stage. -100 leaves room for later phases to enable components to start automatically after the container. For example, when we hand over the listener container to Spring via @bean, the initialization will be done automatically via SmartLifecycle, but when we hand over the new listener container instance, the post-initialization will not be done. Such as KafkaMessageListenerContainer instance need to manually perform the start ().

AutoStartup Setting true and false has no effect on manually executing start. See the @kafkalistener declaration cycle management section.

2.3.2 @ KafkaListener annotation

2.3.2.1 Record Listeners

The @kafKalistener annotation is used to specify the bean method as the listener of the listener container. Bean packaging in a MessagingMessageListenerAdapter, the adapter configuration has various functions, such as converter, is used to transform the data (if necessary) to match the method parameters. By using the attribute placeholder (${… }), or you can use SpEL(#{… }) configure most of the properties on the annotation. See Javadoc for more information.

@KafkaListener:

  • id: listener Unique ID. If GroupId is not configured, the default id is automatically generated. If specified, the group ID will be overwritten.
  • containerFactoryKafkaListener (@kafkalistener, @kafkalistener, @kafkalistener, @kafkalistener, @kafkalistener, @kafkalistener (@kafkalistener, @kafkalistenerConcurrentKafkaListenerContainerFactoryTo configure the Bean name
  • topicsTopic to listen on (expression, placeholder, keyword, or Topic name){"topic1" , "topic2"}
  • topicPattern: Topic mode for this listener. The entry can be topic mode, attribute placeholder key, or Expression. The framework creates a container that subscribes to all topics that match the specified pattern to get dynamically allocated partitions. Pattern matching is performed periodically for topics that exist at the time of the inspection. The expression must be resolved to topic mode (supporting string or schema result types). This uses group management, where Kafka assigns partitions to group members.
  • topicPartitions: used when using manual subject/partition assignments
  • errorHandler: Listens for the exception handler and configures the Bean name, which defaults to null
  • groupId: Consumer group ID
  • idIsGroup: Indicates whether the id is GroupId
  • clientIdPrefix: Consumer Id prefix
  • beanRef: the Bean name that actually listens to the container, preceded by “__”

The @kafKalistener annotation provides a mechanism for simple POJO listeners. The following example shows how to use it:

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {... }}Copy the code

This is one of the effective mechanism to @ the Configuration class @ EnableKafka annotations and foundation is used to configure ConcurrentMessageListenerContainer listener container factory. By default, the need to be called kafkaListenerContainerFactory bean. The following example demonstrates how to use ConcurrentMessageListenerContain:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory(a) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs(a) {
        Map<String, Object> props = newHashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); .returnprops; }}Copy the code

Note that to set the container properties, you must use the getContainerProperties() method on the factory. It is used as a template to inject the actual properties of the container.

As of version 2.1.1, you can now set the client.id attribute for consumers created by annotations. The suffix clientdPrefix is -n, where n is an integer indicating the container number when concurrency is used.

As of version 2.2, it is now possible to override the concurrency and auto-start properties of container factories by using the properties of the annotation itself. Properties can be simple values, property placeholders, or SpEL expressions. The following example demonstrates how to do this:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {... }Copy the code

You can also configure POJO listeners with explicit themes and partitions (and optional initial offsets). The following example demonstrates how to do this:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0"."1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))})public void listen(ConsumerRecord
        record) {... }Copy the code

You can specify each partition in the PARTITIONS or partitionOffsets attribute, but not both.

This should also be provided to the listener when using manual AckMode. The following example also demonstrates how to use different container factories:

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {... ack.acknowledge(); }Copy the code

Finally, metadata about the message can be obtained from the message header. You can retrieve header content using the following header names:

KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
Copy the code

Example:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {... }Copy the code
2.3.2.2 Batch listener

Starting with version 1.1, the @kafKalistener method can be configured to receive the entire batch of consumer records received from the consumer. To configure the listener container factory to create batch listeners, set the batchListener property. The following example demonstrates how to do this:

@Bean
publicKafkaListenerContainerFactory<? ,? > batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}
Copy the code

The following example shows how to receive a payload list:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {... }Copy the code

Topics, partitions, offsets, and so on are available in headers parallel to the payload. The following example shows how to use headings:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}
Copy the code

Alternatively, you can receive the list of messages Message
object, which contains each offset and other details in each message, but must be a unique parameter (except when using manual commit, this Acknowledgment and/or Consumer
Parameters). The following example shows how to do this:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List
       
        > list)
       > {... }@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List
       
        > list, Acknowledgment ack)
       > {... }@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List
       
        > list, Acknowledgment ack, Consumer
         consumer)
       > {... }Copy the code

In this case, no conversion is performed on the payload. If BatchMessagingMessageConverter configuration RecordMessageConverter, still can add generic types to the message parameter, and transform the payload. For more information, see Load transformation with Batch Listeners.

You can also receive a ConsumerRecord
object, but it must be the only argument (when using manual submission or Consumer
Parameter when in addition to optional Acknowledgment). The following example demonstrates how to do this:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {... }@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {... }Copy the code

Starting with version 2.2, listeners can receive the full ConsumerRecords
object that allows listeners to access other methods, such as Partitions (), which return TopicPartition instances in the list, and Records (TopicPartition), which retrieve selective records. Again, this must be the only argument (when using manual submission or Consumer
Parameter when in addition to optional Acknowledgment). The following example demonstrates how to do this:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords
        records) {... }Copy the code

2.3.3 @ KafkaListener @ content authentication

Starting with version 2.2, it is now easier to add validators to verify the @kafkalistener ‘ ‘@payload parameter. Previously, you must configure a custom DefaultMessageHandlerMethodFactory and add it to the registry. Now you can add validators to the registrar itself. The following code shows how to do this:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {...@Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(newMyValidator()); }}Copy the code

When you are in the Spring the Boot using the validation starter, can automatically configure LocalValidatorFactoryBean, as shown in the following cases:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    privateLocalValidatorFactoryBean validator; .@Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator); }}Copy the code

The following example demonstrates how to verify:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar(a) {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar; }}Copy the code
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {... }@Bean
public KafkaListenerErrorHandler validationErrorHandler(a) {
    return (m, e) -> {
        ...
    };
}
Copy the code

2.3.4 Rebalancing listeners

ContainerProperties one called consumerRebalanceListener attribute, this property to accept Kafka client consumerRebalanceListener interface implementation. If this property is not provided, the container configures a log listener that logs the rebalancing event at the information level. The framework also added a subinterface ConsumerRawareRebalanceListener. The following list shows the ConsumerRawareRebalanceListener interface definition:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
    void onPartitionsRevokedBeforeCommit(Consumer
        consumer, Collection
       
         partitions)
       ;
    void onPartitionsRevokedAfterCommit(Consumer
        consumer, Collection
       
         partitions)
       ;
    void onPartitionsAssigned(Consumer
        consumer, Collection
       
         partitions)
       ;
}
Copy the code

2.3.5 Forwarding listener messages

Starting with version 2.0, if @kafkalistener is also annotated with the @sendto annotation and the method call returns a result, the result will be forwarded to the subject specified by @sendto. Such as:

@KafkaListener(topics = "annotated21")
@SendTo(! "" {request.value()}") // runtime SpEL
public String replyingListener(String in) {... }@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {... }@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {... }...@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {... }@KafkaHandler
    @SendTo(! "" {'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {... }}Copy the code

2.3.6 @kafKalistener Life cycle management

The listener container created for the @kafKalistener annotation is not a bean in the application context. On the contrary, they are using bean registered KafkaListenerEndpointRegistry types of infrastructure. This bean is automatically declared by the framework and manages the lifecycle of the container; It will automatically start any container with autoStartup set to true. All containers created by all container factories must be in the same phase. For more information, see Listener container Automatic startup. You can use the registry to programmatically manage the lifecycle. Starting or stopping the registry starts or stops all registered containers. Alternatively, you can obtain a reference to a single container by using the ID attribute of that container. AutoStartup can be set on the annotation, which overrides the default Settings configured in the container factory (setAutoStartup(true)). You can get a reference to a bean from an application context, such as automatic wiring, to manage its registered container. The following example illustrates how to do this:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...). {... }Copy the code
@Autowired
privateKafkaListenerEndpointRegistry registry; .this.registry.getListenerContainer("myContainer").start(); .Copy the code

The registry only maintains the lifecycle of the containers it manages; Containers declared as beans are not managed by the registry and can be retrieved from the application context. A collection of managed containers can be obtained by calling the getListenerContainers() method of the registry. Spring Kafka version 2.2.5 adds a convenience method, getAllListenerContainers(), that returns a collection of all containers, both those managed by the registry and those declared as beans. The returned collection will include any prototype beans that have been initialized, but it will not initialize any delayed bean declarations.

2.4 stream processing

Spring for Apache Kafka provides a factory bean to create StreamsBuilder objects and manage the life cycle of their streams. As long as kafka flow on the classpath and kafka circulation @ EnableKafkaStreams comments open, Spring the Boot will automatically configure the required KafkaStreamsConfiguration bean.

Enabling Kafka streams means that the application ID and bootstrap Servers must be set. The former can use spring. Kafka. Streams. Application – id configuration, if not set, the default for spring. The application. The name. The latter can be set globally or overridden specifically for streams.

You can use several other attributes with a dedicated attribute; You can use the spring. Kafka. Streams. Set any other properties namespace Kafka properties. For more information, Additional Kafka Properties.

By default, streams managed by the StreamBuilder object it creates are automatically started. Can use the spring. Kafka. Streams. Auto – startup custom attribute this behavior.

To use the factory bean, simply connect the StreamsBuilder to the @Bean, as shown in the following example:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        returnstream; }}Copy the code

By default, streams managed by the StreamBuilder object it creates are automatically started. Can use the spring. Kafka. Streams. Auto – startup custom attribute this behavior.

2.5 Additional Configuration

The properties supported by auto configuration are displayed in common application properties. Note that in most cases, these properties (hyphen or hump style) map directly to Apache Kafka dot properties. Refer to the Apache Kafka documentation for more information.

The properties mentioned earlier apply to all components (producer, consumer, administrator, and flow), but if you want to use different values, you can specify them at the component level. Apache Kafka specifies attributes of HIGH, MEDIUM, or LOW importance. Spring Boot automatic configuration supports all high-importance properties, some selected medium and low properties, and any properties that do not have default values.

Only a subset of the properties that Kafka supports can be used directly through the KafkaProperties class. To configure producers or consumers with other properties that are not directly supported, use the following properties:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
Copy the code

The parameter setting example above sets the public prop.onekafka property to first (for producers, consumers, and administrators), the prop.two admin property to second, and the prop.three consumer property to third, The prop. Four Producer property is set to Fourth and the prop. Five Streams property is set to fifth.

You can also configure Spring Kafka JsonDeserializer, as shown below:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
Copy the code

Similarly, you can disable JsonSerializer’s default behavior for sending type information in the header:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
Copy the code

Note: Properties set in this way will override any configuration items explicitly supported by Spring Boot.

2.6 Test using Embdded Kafka

Spring for Apache Kafka provides a convenient way to test projects using embedded Apache Kafka agents. To use this feature, use the @EmbeddedKafka annotation test class in the Spring Kafka test module. See the Spring For Apache Kafka Reference manual For more information.

To make Spring Boot automatic configuration work with the embedded Apache Kafka agent mentioned earlier, you need to remap the system properties of the embedded agent address (populated by EmbeddedKafkaBroker) to the Spring Boot configuration properties of Apache Kafka. There are several ways to do this:

  • Provides system properties to map the embedded agent address to the test classspring.kafka.bootstrap-servers:
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Copy the code
  • in@EmbeddedKafkaConfigure attribute names on annotations:
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
Copy the code
  • Use placeholders in configuration properties:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Copy the code

2.7 Spring Integration support

Spring Integration also has an adapter for Kafka, so you can easily use Spring Integration to implement publish and subscribe. Of course, you don’t have to use Spring Integration.

What Spring Integration is and what it does, please refer to another article “The Most Detailed Explanation of Spring Integration”.

3 Spring Kafka configuration parameters

As an example, the spring Kafka configuration is divided into global configuration and submodule configuration. The submodule configuration overwrites the global configuration. For example, SSL authentication can be configured globally, but SSL can also be configured separately in each submodule, such as consumer, producer, and streaming processing. Consumers and producers are not in the same application). I’ll focus on producer and consumer configurations here, but I won’t expand the rest, so I can find and supplement them as needed.

3.1 Global Configuration

# Comma-separated hosts: list of port pairs used to establish initial connections to the Kafka cluster. Overrides the global connection Settings property
spring.kafka.bootstrap-servers
# The ID passed to the server when the request is made. Used for server-side loggingSpring.kafka. client-id: None by defaultOther properties used to configure the client, properties shared by producers and consumers
spring.kafka.properties.*
# Default subject for sending messages. Default none
spring.kafka.template.default-topic
Copy the code

3.2 producers

In Spring Boot, Kafka producer configuration (all configurations prefixed with spring.kafka.producer.) :

The producer requires the Leader to consider the number of acknowledgements received before the request completes
spring.kafka.producer.acks
# Default batch size. Smaller batch sizes will make batch processing less common and may reduce throughput (a batch size of zero will disable batch processing entirely)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
The total amount of memory available to the producer to buffer records waiting to be sent to the server.
spring.kafka.producer.buffer-memory
# The ID passed to the server when the request is made. Used for server-side logging.
spring.kafka.producer.client-id
Type of compression for all data generated by the producer
spring.kafka.producer.compression-type
Serializer class for the key
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# Enable the number of retries for failed sending when greater than zerospring.kafka.producer.retries spring.kafka.producer.ssl.key-password spring.kafka.producer.ssl.key-store-location spring.kafka.producer.ssl.key-store-password spring.kafka.producer.ssl.key-store-type spring.kafka.producer.ssl.protocol  spring.kafka.producer.ssl.trust-store-location spring.kafka.producer.ssl.trust-store-password spring.kafka.producer.ssl.trust-store-typeEnable transaction support for producers when not null
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer
Copy the code

3.3 consumers

Spring Boot, Kafka consumer configuration (all configurations prefixed with spring.kafka.consumer.) :

# if "enable.auto.mit" is set to true, set the frequency at which the consumer offset is automatically submitted to Kafka. Default: none, in ms
spring.kafka.consumer.auto-commit-interval
The default value is "latest", "earliest", and "none"
When there are submitted offsets under each category, the money will be consumed from the submitted offsets. If there is no submitted offset, the consumption starts from scratch
# latest If each partition has a committed offset, the offset will be consumed from the committed offset. If there is no committed offset, the newly generated data under the partition is consumed
# None topic consumption starts after offset when all partitions have committed offsets. An exception is thrown whenever a partition does not have a committed offset
spring.kafka.consumer.auto-offset-reset
# Comma-separated hosts: list of port pairs used to establish initial connections to the Kafka cluster. Overrides the global connection Settings property
spring.kafka.consumer.bootstrap-servers
# The ID passed to the server when the request is made for server-side logging
spring.kafka.consumer.client-id
Whether the consumer's offset is periodically submitted in the background
spring.kafka.consumer.enable-auto-commit
# The maximum amount of time the server can block before fetching a request if there is not enough data to immediately satisfy the "fetch-min-size" requirement
spring.kafka.consumer.fetch-max-wait
The server should return the minimum amount of data to retrieve the request.
spring.kafka.consumer.fetch-min-size
A unique string that identifies the default consumer group to which this consumer belongs
spring.kafka.consumer.group-id
# Expected heartbeat interval time for the consumer Coordinator.
spring.kafka.consumer.heartbeat-interval
# Isolation level for reading messages written transactionally.
spring.kafka.consumer.isolation-level
Key deserializer class
spring.kafka.consumer.key-deserializer
# Maximum number of records returned in a single call to poll().
spring.kafka.consumer.max-poll-records
Other consumer-specific properties for configuring the client.
spring.kafka.consumer.properties.*
The password of the private key in the keystore file.
spring.kafka.consumer.ssl.key-password
# Key store file location.
spring.kafka.consumer.ssl.key-store-location
Key store file storage password.
spring.kafka.consumer.ssl.key-store-password
# Type of key store, e.g. JKS
spring.kafka.consumer.ssl.key-store-type
SSL protocols to use, such as TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# Trust the location of the stored file.
spring.kafka.consumer.ssl.trust-store-location
# trust the storage password of the stored file.
spring.kafka.consumer.ssl.trust-store-password
# Trust store type.
spring.kafka.consumer.ssl.trust-store-type
Deserializer class for the # value.
spring.kafka.consumer.value-deserializer

Copy the code

3.4 the listener

In Spring Boot, Kafka Listener configuration (all configurations prefixed with spring.kafka.listener.) :

The number of records offset between commits when # ackMode is "COUNT" or "COUNT_TIME"
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
The container cannot be started if there is not at least one topic configured on the Broker.
# This setting is combined with the Broker setting allow.auto. Create. Topics =true, if false, the topic that does not exist will be automatically created
spring.kafka.listener.missing-topics-fatal=true
# Time between checks for non-responsive consumers. If no duration suffix is specified, seconds are used
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type
Copy the code

3.5 management

spring.kafka.admin.client-id
Whether to fail quickly if the agent is not available at startup
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type
Copy the code

3.6 Authorization Service (JAAS)

spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*

Copy the code

3.7 the SSL certificate

spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type
Copy the code

3.8 Stream Stream processing

spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir
Copy the code

Review of basic features of Kafka subscription publishing

  • All consumers in the same consumer group collaboratively consume all partitions of the subscription topic
    • With the same consumer group, multiple consumers subscribe to a single topic single partition, then the partition will only be assigned to one of the consumers, unless this consumer hang up, will be assigned to another consumer consumption message, meaning that other consumers watch beside eating
    • In the same consumer group, if N consumers subscribe to a single topic and N partitions, each consumer will be assigned a partition by default
    • In the same consumption group, N consumers subscribe to M partitions of a single topic. When M > N, there will be more than one partition allocated by consumers. When M < N, there will be idle consumers, similar to article 1
    • All of these consumer instances can exist as threads or processes. The partitioning mechanism is called rebalance.
    • When the number of members in the consumer changes, rebalancing will be triggered. A change in the number of subscribed topics triggers rebalancing; A change in the number of subscribed topic partitions triggers rebalancing;
    • In short, a partition can only be assigned to a consumer, a consumer can be assigned to multiple partitions
  • Consumer offset management mechanism
    • Messages in each topic partition have a unique offset value, which has a sequential relationship with the consumer. For each message consumed by the consumer, the offset value is increased by one and recorded locally, and the record is periodically synchronized to the Broker, where the synchronization mechanism can be set
    • Messages are persisted, and when all consumers in the group re-subscribe to the topic, you can set whether to consume the message from scratch or from the last recorded offset
  • How to set the number of partitions and consumers
    • We know that topic partitions are distributed across different brokers, with one consumer for each partition, resulting in high throughput for message processing
    • Partitioning is the smallest unit for tuning Kafka parallelism. Multithreaded consumers connect to multipartite consuming messages, which, by implementation, are connected through sockets and therefore consume file handles
    • Creating partitions takes up a certain amount of memory, not as many partitions as possible, but now the Kafka community is optimizing this to make the number of partitions as large as possible without affecting performance

The details of how to tune replicas, partitions, consumers, etc., will not be developed here, and will be devoted to this question later.

5 Publish and subscribe example

The environment required to implement the following example:

  • The Kafka + Zookeeper single point server or cluster is already configured (if you are not familiar with the environment setup, check out the article on Kafka environment Setup and testing), or you can use the Kafka + Zookeeper single point server or clusterSpring-kafka-test embedded Kafka Server
  • Spring Boot Development Environment (2.2.1)
    • JDK(1.8 or above)
    • STS (4.4 RELEASE)
    • MARVEN construction method

5.1 Using Embedded Kafka Server

Kafka is built using Scala and Zookeeper. You can download the deployment package from the official website and deploy it locally. However, Spring Kafka Test has encapsulated the one-click functionality of Kafka testing with annotations to open the Kafka server, simplifying the development process of verifying Kafka-related functionality and making it very simple to use.

Add dependencies:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>
Copy the code

Start the Kafka server service directly using the Junit Test case, which consists of four proxy nodes, Run as Junit Test. :

@RunWith(SpringRunner.class) @SpringBootTest(classes = ApplicationTests.class) @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) public class ApplicationTests {@test public void contextLoads()throws IOException { System.in.read(); }}Copy the code

Parameters can be set in @embeddedKafka:

  • Value: Specifies the number of proxies to be created
  • Count: the same value
  • Ports: list of proxy ports
  • BrokerPropertiesLocation: specify the configuration file, such as “the classpath: application. The properties”

Note: EmbeddedKafka so the default is not to create a theme. Topic(S) [test] is/are not present and missingTopicsFatal is true The @EmbeddedKafka default is to create an agent with a random port with no parameters that will output specific ports and default configuration items in the startup log.

5.2 Simple Publish and Subscribe Implementation (No Custom Configuration)

The following implementation of a simple publish and subscribe function, through the front-end WEB call an API, and then in the API controller after the request is received by the producer to send a message, the consumer background listening to the message, if received consumer message, it will be printed.

5.2.1 Adding dependencies and Configuring Kafka

Add a Kafka dependency:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>			
</dependency>
Copy the code

Kafka Brokers service address + port Kafka Brokers service address + port

Server: port: 9000 spring: kafka: the bootstrap - the servers: 10.151.113.57:9092,10.151. 113.57:9093,10.151. 113.57:9094 listener:# set not to listen on topic error, false, if the broker set llow.auto. Create. Topics = true, producers will automatically create topics when they send to uncreated topics
        # and the default theme created is single-copy single-partition
        missing-topics-fatal: false
    consumer:
        Configure whether the consumer message offset is automatically reset (the consumer reconnects to receive the original message)
        auto-offset-reset: earliest
Copy the code

5.2.2 Adding a Producer

@Service
public class Producer {

	private static final Logger LOGGER = LogManager.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
    	LOGGER.info(String.format("===Producing message: {}", message));
        this.kafkaTemplate.send(TOPIC, message); }}Copy the code

5.2.3 Adding consumers

@Service
public class Consumer {

	private static final Logger LOGGER = LogManager.getLogger(Consumer.class);

    @KafkaListener(topics = "test", groupId = "group_test")
    public void consume(String message) throws IOException {
    	LOGGER.info(String.format("#### -> Consumed message -> %s", message)); }}Copy the code

5.2.4 Adding a WEB Controller

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message); }}Copy the code

5.2.5 test

Add Spring Boot Application:

@SpringBootApplication
public class TestKafkaApplication {
	public static void main(String[] args) { SpringApplication.run(TestKafkaApplication.class, args); }}Copy the code

Once Kafka Brokers is started, you need to create the theme manually (with KafkaAdmin if you want to create it automatically, Or Kafka Broker with allow.auto. Create. Topics =true and listener.missing-topics-fatal=false) :

# if you are not familiar with kafka-topics, check out the previous article on kafka (environment setup and testing).
Create a test theme
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test
Copy the code

Open browser test:

http://localhost:9000/kafka/publish?message=hello
Copy the code

The application console prints Hello. The whole publish-subscribe implementation uses only the @kafkalistener annotation associated with Kafka to receive messages and the KafkaTemplate template to send messages. It’s pretty simple.

5.3 Implement publishing and subscription based on user-defined configuration

Spring Kafka: Spring Boot: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka: Kafka Therefore, this section uses the custom configuration file approach described in our previous Spring Boot from Zero 7_ Latest Configuration file Configuration and Priority Details article to implement publish and subscribe functionality.

The implementation content includes:

  • Custom Kafka configuration parameter file (not application. The properties/yml)
  • Can realize multiple producers (each producer is a single service single thread), multiple consumers (non@KafkaListenerImplement message listening)
  • Supports SSL security configuration
  • Monitor producer

Source code will not be posted directly, only the body part is given.

Configuration file:

@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {

    @Value("${m2kc.kafka.bootstrap.servers}")
    private String kafkaBootStrapServers;

    @Value("${m2kc.kafka.key.serializer.class}")
    privateString kafkaKeySerializerClass; . . }Copy the code

Producers:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
    private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
    private String mTopic = "test";
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaTemplate<String, String> mKafkaTemplate;
   
    @Autowired
    public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;     
        mKafkaTemplate = getKafkaTemplate();
    }

    public KafkaTemplate<String, String> getKafkaTemplate(a) {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory(a) {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());        
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());            
        }

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
    
    public void sendMessage(String msg) {
        LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
        ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.info("===Producing message success");  
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("===Producing message failed"); }}); }}Copy the code

Consumer:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
    private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);

    private String mTopic;
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; 

    @Autowired
    public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
        LOGGER.info("===KafkaConsumer construct");
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;
    }
    
    @PostConstruct
    public void start(){
        LOGGER.info("===KafkaConsumer start");        
    }
    
    @Override  
    public void afterPropertiesSet() throws Exception {          
        LOGGER.info("===afterPropertiesSet is called");      
        createContainer();
    }  

    private void createContainer() {
        mKafkaMessageListenerContainer =  createKafkaMessageListenerContainer();
        mKafkaMessageListenerContainer.setAutoStartup(false);;
        mKafkaMessageListenerContainer.start();
        LOGGER.info("= = =", mKafkaMessageListenerContainer);
    }
    
    private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
                createContainerProperties());
        LOGGER.info("===createKafkaMessageListenerContainer");
        return container;
    }
   
    private ContainerProperties createContainerProperties() {
        ContainerProperties containerProps = new ContainerProperties(mTopic);
        containerProps.setMessageListener(createMessageListener());
        return containerProps;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
        }

        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }

    private MessageListener<String, String> createMessageListener() {
        return new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                // TODO Auto-generated method stub
                LOGGER.info("===Consuming msg: {}", data.value()); }}; }}Copy the code

The InitializingBean is inherited just for initialization, or it can be removed, writing the initialization into the constructor. Both the consumer and producer here use @Scope, so you need to manually get the instance and call getBean() through the context. Note that the configuration file is incomplete.

5.3 Publish and subscribe implementations based on Spring Integration

Spring Integration also supports adapters for Kafka. With Spring Integration, we can quickly implement publish and subscribe functions, and implement group multi-consumer bulk consumption functions:

  • Implement Kafka custom configuration classes
  • Using Spring Integration
  • Release subscription
  • Group multi-consumer bulk consumption
  • Write in DSL-specific domain syntax
  • Producer publishing success and failure exception handling

We can start by looking at the Kafka messaging channel as a whole:

  • The outbound channel KafkaProducerMessageHandler used to send a message to the theme
  • KafkaMessageDrivenChannelAdapter used to set the inbound channel and message processing

For a specific Demo, see Github’s sample:

  • Github.com/spring-proj…

6 summarizes

In this article, we introduce the Spring Kafka Stream and the Spring Kafka Stream. We introduce the Spring Kafka Stream and the Spring Kafka Stream. We introduce the Spring Kafka Stream and Spring Kafka Stream. Kafka involves multiple consumers and subscribers, SSL secure transport, Spring Integration Kafka and so on. The article is very long, grasp the overall, combined with the actual, almost the basic content has been involved.

7 Knowledge expansion

The Spring Expression Language (SpEL), in Spring, is different from the property placeholder ${… }, and the SpEL expression is placed in #{… } (except for Expression in the code block). If the configuration file has topics parameter spring.kafka. Topics, @kafKalistener (id = “foo”, topics = “#{‘${topicOne:annotated1,foo}’.split(‘,’)}”)

Common examples of SpEL expressions:

/ / literal# {3.1415926}    / / floating point number# {9.87 e4}       // The scientific notation is 98700# {'Hello'}      / / type String# {false}        / / a Boolean type
// Reference beans, properties, and methods
#{sgtPeppers}                                   // Use this bean
#{sgtPeppers.artist}                            // Reference the properties in the bean
#{sgtPeppers.selectArtist()}                    // Reference the method in the bean
#{sgtPeppers.selectArtist().toUpperCase()}      // Method returns the value of the operation#{sgtPeppers.selectArtist()? .toUpperCase()}// prevent selectArtist() from returning null,? ToUpperCase () if not null
// To access class-scoped methods and constants, use the key operator T()
#{T(java.lang.Math)}   
#{T(java.lang.Math).PI}             // Reference the value of PI
#{T(java.lang.Math).random()}       // Get a random number from 0-1
#{T(System).currentTimeMillis()}    // Get time to the current number of milliseconds
// Substitute property placeholders to get configuration file property values
@Value("#{expression}" 
private String variable;
Copy the code

8 Reference Materials

  • Docs. Spring. IO/spring – kafk…
  • Docs. Spring. IO/spring – the boot…
  • Blog.csdn.net/lishuangzhe…
  • Docs. Spring. IO/spring – the boot…
  • Docs. Spring. IO/spring – the boot…
  • Docs. Spring. IO/spring – kafk…
  • Docs. Spring. IO/spring – kafk…
  • www.javatt.com/p/16904
  • Github.com/cwenao/spri…
  • Docs. Spring. IO/spring – kafk…
  • Docs. Spring. IO/spring – kafk…
  • Docs. Spring. IO/spring – batc…
  • www.intertech.com/Blog/spring…
  • Joshlong.com/jl/blogPost…
  • Examples.javacodegeeks.com/enterprise-…
  • www.orchome.com/553
  • Docs. Spring. IO/spring – inte…
  • Programming. The VIP/docs/spring… (Transactional messages)
  • Docs. Confluent. IO/current/kaf…
  • Github.com/spring-proj…
  • www.jianshu.com/p/27fd3754b…
  • www.jianshu.com/p/cec449a7e…
  • Memorynotfound.com/spring-kafk…