Spring brings the familiar Spring programming model to Kafka. It provides a KafkaTemplate for publishing records and a listener container for asynchronously executing POJO listeners. Spring Boot automatic configuration connects most of the infrastructure, so you can focus on business logic.

Error recovery

Consider this simple POJO listener method:

@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(String in) {
  logger.info("Received: " + in);
  if (in.startsWith("foo")) {
    throw new RuntimeException("failed"); }}Copy the code

By default, only the failed record is logged, and then we move on to the next record. However, we can configure error handlers in the listener container to perform additional operations. To do this, we override Spring Boot’s auto-configuration container factory using our own method:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
  ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  configurer.configure(factory, kafkaConsumerFactory);
  factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<<
  return factory;
}
Copy the code

Note that we can still take advantage of most of the automatic configuration.

SeekToCurrentErrorHandler discarded from the poll () the rest of the records, and perform the seek operations to achieve consumer offset offset reset operation, make records at the next polling and remove. By default, the error handler tracks failed records, abandons and logs failed records after 10 delivery attempts. However, we can also send the failed message to another topic. We call it a dead letter theme.

Here’s the code put together:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    ConsumerFactory<Object, Object> kafkaConsumerFactory,
    KafkaTemplate<Object, Object> template) {
  ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  configurer.configure(factory, kafkaConsumerFactory);
  factory.setErrorHandler(new SeekToCurrentErrorHandler(
      new DeadLetterPublishingRecoverer(template), 3));
  return factory;
}

@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(String in) {
  logger.info("Received: " + in);
  if (in.startsWith("foo")) {
    throw new RuntimeException("failed");
  }
}

@KafkaListener(id = "dltGroup", topics = "topic1.DLT")
public void dltListen(String in) {
  logger.info("Received from DLT: " + in);
}
Copy the code

Deserialization error

But what about deserialization exceptions that occur before Spring gets the record? Use ErrorHandlingDeserializer. This deserializer wraps the delegate deserializer and catches any exceptions. They are then forwarded to the listener container, which sends them directly to the error handler. The exception contains source data, so you can diagnose the problem.

Domain objects and infer types

Consider the following example:

@Bean
public RecordMessageConverter converter() {
  return new StringJsonMessageConverter();
}

@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(Foo2 foo) {
  logger.info("Received: " + foo);
  if (foo.getFoo().startsWith("fail")) {
    throw new RuntimeException("failed");
  }
}

@KafkaListener(id = "dltGroup", topics = "topic1.DLT")
public void dltListen(Foo2 in) {
  logger.info("Received from DLT: " + in);
}
Copy the code

Notice that we are now using an object of type Foo2. The message converter bean deduces the type to be converted to the parameter type in the method signature. The converter automatically “trusts” the type. Spring Boot automatically configures the converter into the listener container.

On the producer side, the object sent can be a different class (as long as it is type-compatible) :

@RestController
public class Controller {

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @PostMapping(path = "/send/foo/{what}")
    public void sendFoo(@PathVariable String what) {
        this.template.send("topic1", new Foo1(what)); }}Copy the code

Configuration:

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

$ curl -X POST http://localhost:8080/send/foo/fail
Copy the code

Here, we use StringDeserializer and a “smart” message converter on the consumer side.

Multimethod listener

We can also use a single listener container and route to specific methods based on type. Since there are multiple methods, the type needs to choose which method to call, so we can’t infer the type here.

Instead, we rely on type information passed in the record header to map from the source type to the target type. Also, since we do not infer types, we need to configure the message converter to “trust” the package’s mapping type.

In this case, we’ll use the message converter on both sides (StringSerializer and StringDeserializer are used together). The following consumer-side converter example puts them together:

@Bean
public RecordMessageConverter converter() {
  StringJsonMessageConverter converter = new StringJsonMessageConverter();
  DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
  typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
  typeMapper.addTrustedPackages("com.common"); Map<String, Class<? >> mappings = new HashMap<>(); mappings.put("foo", Foo2.class);
  mappings.put("bar", Bar2.class);
  typeMapper.setIdClassMapping(mappings);
  converter.setTypeMapper(typeMapper);
  return converter;
}
Copy the code

Here, we map “foo” to class Foo2 and “bar” to class Bar2. Note that we must tell it to use the TYPE_ID header to determine the type of the conversion. Also, Spring Boot automatically configures the message converter into the container. Here is the producer-side type mapping in the application.yml file fragment; Tokens in colon-delimited format :FQCN list:

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1
Copy the code

This configuration maps the class Foo1 to “foo” and the class Bar1 to “bar”.

Listener:

@Component
@KafkaListener(id = "multiGroup", topics = { "foos"."bars" })
public class MultiMethods {

    @KafkaHandler
    public void foo(Foo1 foo) {
        System.out.println("Received: " + foo);
    }

    @KafkaHandler
    public void bar(Bar bar) {
        System.out.println("Received: " + bar);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        System.out.println("Received unknown: "+ object); }}Copy the code

Producers:

@RestController
public class Controller {

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @PostMapping(path = "/send/foo/{what}")
    public void sendFoo(@PathVariable String what) {
        this.template.send(new GenericMessage<>(new Foo1(what),
                Collections.singletonMap(KafkaHeaders.TOPIC, "foos")));
    }

    @PostMapping(path = "/send/bar/{what}")
    public void sendBar(@PathVariable String what) {
        this.template.send(new GenericMessage<>(new Bar(what),
                Collections.singletonMap(KafkaHeaders.TOPIC, "bars")));
    }

    @PostMapping(path = "/send/unknown/{what}")
    public void sendUnknown(@PathVariable String what) {
        this.template.send(new GenericMessage<>(what,
                Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); }}Copy the code

The transaction

Enable transactions by setting transactional-id-prefix in the application.yml file:

spring:
    kafka:
      producer:
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        transaction-id-prefix: tx.
      consumer:
        properties:
          isolation.level: read_committed
Copy the code

When using Spring-Kafka 1.3.x or higher and kafka-Clients version 0.11 or higher that supports transactions, any operation @kafkalistener performed by the method KafkaTemplate will participate in the transaction. And the listener container will send the offset to the transaction before committing. Recognize that we also set an isolation level for consumers that prevents them from seeing uncommitted records. The following example pauses the listener so that we can see the effect:

@KafkaListener(id = "fooGroup2", topics = "topic2")
public void listen(List foos) throws IOException {
  logger.info("Received: " + foos);
  foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase()));
  logger.info("Messages sent, hit enter to commit tx");
  System.in.read();
}

@KafkaListener(id = "fooGroup3", topics = "topic3")
public void listen(String in) {
  logger.info("Received: " + in);
}
Copy the code

The producer in this example sends multiple records in a single transaction:

@PostMapping(path = "/send/foos/{what}")
public void sendFoo(@PathVariable String what) {
  this.template.executeInTransaction(kafkaTemplate -> {
    StringUtils.commaDelimitedListToSet(what).stream()
      .map(s -> new Foo1(s))
      .forEach(foo -> kafkaTemplate.send("topic2", foo));
    return null;
  });
}

curl -X POST http://localhost:8080/send/foos/a,b,c,d,e

Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]]
Messages sent, hit Enter to commit tx

Received: [A, B, C, D, E]
Copy the code

Write at the end:

Since I see here, I feel that the author wrote a good point like, add a concern! Point attention, do not get lost, continue to update!!