1. The introduction

Apache Kafka is a distributed, fault-tolerant stream processing system. In this article, we’ll look at Spring’s support for Apache Kafka and the level of abstraction provided by the native Kafka Java client Api.

Spring Kafka brings a simple and typical Spring template programming model with a KafkaTemplate and message-driven POJOs via the @kafkalistener annotation.

2. Installation and configuration

To download and install Kafka, refer to the official guide. Then you also need to add spring-kafka to the pom.xml file:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.7. RELEASE</version>
</dependency>
Copy the code

Create a new Spring Boot sample application that starts with the default configuration.

3. The configuration switchable viewer

Previously we used command line tools to create topics in Kafka, for example:

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic
Copy the code

But with the introduction of AdminClient in Kafka, we can now create topics programmatically.

Add the KafkAdmin bean to Spring, which will automatically add topics to all beans of the NewTopic class:

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
 
    @Bean
    public KafkaAdmin kafkaAdmin(a) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1(a) {
         return new NewTopic("developlee".1, (short) 1); }}Copy the code

4. Message generation

To create a message, first configure the ProducerFactory and set the policy for creating a Kafka Producer instance, then use KafkaTemplate. The KafkaTemplate wraps the Producer instance and provides an easy way to send messages to a Kafka Topic.

Using a single instance in the context of the entire application provides higher performance. Therefore, using a Producer instance is recommended. This instance is thread-safe, so the KakfaTemplate instance is thread-safe,

4.1. Producer configuration

@Configuration
public class KafkaProducerConfig {
 
    @Bean
    public ProducerFactory<String, String> producerFactory(a) {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(a) {
        return newKafkaTemplate<>(producerFactory()); }}Copy the code

4.2. Message release

We use KafkaTemplate to post messages:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
 
public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}
Copy the code

Send API returns ListenableFuture object. If we want to block the sending thread and get results about sending a message, we can call the Get API of the ListenableFuture object. The thread will wait for the result, but it will slow down the producer.

Kafka is a fast streaming platform. Therefore, it is best to process the results asynchronously so that subsequent messages do not have to wait for the results of the previous message. We can do this by calling back:

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
 
        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : "+ ex.getMessage()); }}); }Copy the code

5. Message consumption

5.1. Consumer configuration

News about consumption, we need to configure a ConsumerFactory and a KafkaListenerContainerFactory.

Once these beans are available in the Spring Bean factory, you can configure poJO-based consumers using the @kafKalistener annotation.

The @enableKafka annotation needs to be added to the configuration class to be able to detect the @kafkalistener annotation on spring-managed beans:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory(a) {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory(a) {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        returnfactory; }}Copy the code

5.2. Message consumption

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}
Copy the code

Multiple Listeners can be implemented for a topic, each with a different group Id. In addition, a consumer can listen for messages from different topics:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Copy the code

Spring also supports retrieving one or more message headers using the @header annotation in the Listener:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"+"from partition: " + partition);
}
Copy the code

5.3. Consume messages from a specific partition

Note that we only created a topic “developlee” with one partition. However, for topics with multiple partitions, @kafkalistener can explicitly subscribe to a specific partition with an initial offset topic:

@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"+"from partition: " + partition);
}
Copy the code

Because initialOffset has been sent to partition 0 in this listener, all messages previously consumed from partition 0 and partition 3 are reused each time the Listener is initialized. If you don’t need to set offsets, you can use the @TopicPartition annotation’s Partitions attribute to set only partitions without offsets:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
Copy the code

5.4. Add a message filter to the Listener

You can configure the Listener to use a specific type of message by adding a custom filter. This can be done through set RecordFilterStrategy to KafkaListenerContainerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory(a) {
 
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}
Copy the code

The Listener can then be configured to use this container factory:

@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}
Copy the code

In this listener, all messages that match the filter are discarded.

6. Custom message converters

So far, we’ve only discussed strings as objects for sending and receiving messages. However, we can also send and receive custom Java objects. This requires configuring the appropriate serializer in ProducerFactory and the deserializer in ConsumerFactory.

Let’s look at a simple bean and send it as a message:

public class Greeting {
 
    private String msg;
    private String name;
 
    // standard getters, setters and constructor
}
Copy the code

6.1. Produce custom messages

In this example, we will use JsonSerializer. Let’s look at ProducerFactory and KafkaTemplate:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory(a) {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}
 
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate(a) {
    return new KafkaTemplate<>(greetingProducerFactory());
}
Copy the code

The new KafkaTemplate can be used to send Greeting messages:

kafkaTemplate.send(topicName, new Greeting("Hello"."World"));
Copy the code

6.2. Consume custom messages

Also, we modify ConsumerFactory and KafkaListenerContainerFactory to correct deserialization the Greeting message:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory(a) {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory(a) {
 
    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}
Copy the code

Spring-kafka JSON serializers and deserializers use the Jackson library, which is an optional Maven dependency of the Spring-Kafka project. We also add it to the pom.xml file:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.97.</version>
</dependency>
Copy the code

It is recommended not to use the latest version of Jackson, but to use the version of Spring-kafka in the pom.xml file. Finally, we need to write a listener to consume the Greeting message:

@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}
Copy the code

7. Conclusion

In this article, we’ve covered the basics of Apache Kafka and Spring integration, along with a brief introduction to the classes used to send and receive messages. The full source code for this article can be found on GitHub. Before executing the code, make sure the server is running Kafka.