preface
Kafka is a message queue product that is based on Topic Partitions to achieve very high message sending performance. Spring has created a project spring-kafka that encapsulates Apache’s Kafka-client for quick kafka integration in Spring projects. In addition to simple messaging, Spring-Kafka provides many advanced functions, and we’ll explore these uses.
Simple integration
Introduction of depend on
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> < version > 2.2.6. RELEASE < / version > < / dependency >Copy the code
Add the configuration
Spring. Kafka. Producer. The bootstrap - the servers = 127.0.0.1:9092Copy the code
Test send and receive
/** * @author: kl @kailing.pub * @date: 2019/5/30 */@SpringBootApplication@RestControllerpublic class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) { SpringApplication.run(Application.class, args); }
@Autowired private KafkaTemplate<Object, Object> template;
@GetMapping("/send/{input}") public void sendFoo(@PathVariable String input) { this.template.send("topic_input", input); } @KafkaListener(id = "webGroup", topics = "topic_input") public void listen(String input) { logger.info("input value: {}" , input); }}Copy the code
After start the application, enter in your browser: http://localhost:8080/send/kl. You can see the log output on the console: input value: “kl”. The basics are as simple as that. When sending a message, inject a KafkaTemplate, and when receiving a message, add an @kafKalistener annotation.
Spring-kafka-test Embedded Kafka Server
Kafka Server is built by Scala + Zookeeper. You can download the deployment package from the official website to deploy Kafka Server locally. However, I want to tell you that in order to simplify the development process of verifying kafka-related functions, Spring-Kafka-test has packaged Kafka-test to provide annotated one-click start Kafka Server functionality, and it is super easy to use. All test cases for the rest of this article provide Kafka using this embedded service.
Introduction of depend on
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> The < version > 2.2.6. RELEASE < / version > < scope >test</scope></dependency>Copy the code
Start the service
The following uses the Junit test case to directly start a Kafka Server service with four Broker nodes.
@RunWith(SpringRunner.class)@SpringBootTest(classes = ApplicationTests.class)@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})public class ApplicationTests {@test public void contextLoads()throws IOException { System.in.read(); }}Copy the code
As above: Pretty cool that you can start a fully functional Kafka service with just one annotation @embeddedKafka. By default, annotations are written with no parameters, creating a Broker with random ports. The specific ports and default configuration items are printed in the startup log. These parameters can be set in the @embeddedKafka annotation and can be set to EmbeddedKafka.
-
Value: Number of broker nodes
-
Count:, like value, is the number of nodes of the configured broker
-
ControlledShutdown: A control shutdown switch that reduces the amount of time a Partition on a Broker is unavailable if it unexpectedly shuts down
Kafka is a highly available service for a multi-broker architecture. A Topic corresponds to multiple partitions, and a partition can have multiple Replication copies that are stored in multiple brokers for high availability. However, while there are multiple partitioned replica sets, there is currently only one working replica set, and the default is the first allocated replica set (preferred replica), which is responsible for writing and reading data. When we need to restart the service to upgrade the Broker or update the Broker configuration, we need to transfer the partition to the available Broker. There are three cases involved
-
Shut down the Broker directly: When the Broker is shut down, the cluster of brokers re-elects a new Broker to serve as the Partition Leader. Partitions on the Broker are temporarily unavailable at the time of election
-
Enable controlledShutdown: When the Broker is shut down, the Broker itself first attempts to transfer the Leader role to another available Broker
-
Run the bin/kafka-preferred-replica-election.sh command to manually trigger the PartitionLeader role transfer
-
Ports: a list of ports, which is an array. The number of brokers corresponding to the count parameter corresponds to the number of port numbers
-
BrokerProperties: Broker parameter Settings, which is an array structure that supports Broker parameter Settings as follows:
@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096"."num.io.threads = 8"})Copy the code
-
OkerPropertiesLocation: Broker parameter file Settings
BrokerProperties provides the same functionality as brokerProperties above, except that Kafka Broker has 182 configurable parameters. This is definitely not optimal, so it provides the ability to load a local configuration file, for example:
@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")Copy the code
Create a new Topic
By default, if a Topic does not exist when a message is sent using KafkaTemplate, a new Topic is created. The default number of partitions and copies is set to the following Broker parameters
num.partitions = 1 Num. Replicas. Fetchers = 1Copy the code
Create a Topic when the program starts
/** * @author: kl @kailing.pub * @date: 2019/5/31 */@Configurationpublic class KafkaConfig { @Bean public KafkaAdmin admin(KafkaProperties properties){ KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties()); admin.setFatalIfBrokerNotAvailable(true); return admin; } @Bean public NewTopic topic2() { return new NewTopic("topic-kl", 1, (short) 1); }}Copy the code
If Kafka Broker supports (version 1.0.0 or later), new partitions will be added if an existing Topic is found to have fewer partitions than the set number. There are several common uses of KafkaAdmin:
SetFatalIfBrokerNotAvailable (true) : the default value is False, the Broker is unavailable, do not affect the Spring context initialization. Set this value to True if you feel that the Broker is unavailable and affecting normal business
SetAutoCreate (false) : The default value is True, which means that Kafka automatically creates the instantiated NewTopic object
Initialize () : When setAutoCreate is false, we need to invoke admin’s initialize() method to initialize the NewTopic object
Created in the code logic
Kafka-client AdminClient is used when you start a Topic and do not know how many partitions are required, but cannot use the default Settings of the Broker directly. The above spring-wrapped KafkaAdmin is also handled using AdminClient. Such as:
@Autowired private KafkaProperties properties; @Test public void testCreateToipc(){ AdminClient client = AdminClient.create(properties.buildAdminProperties()); if(client ! =null){ try { Collection<NewTopic> newTopics = new ArrayList<>(1); newTopics.add(new NewTopic("topic-kl",1,(short) 1)); client.createTopics(newTopics); }catch (Throwable e){ e.printStackTrace(); }finally { client.close(); }}}Copy the code
Ps: Other ways to create a Topic
If you have a spring Boot version older than 2.x, the spring-Kafka2.x version only supports Spring Boot2. x. These apis are not available in 1.x. Here is an additional way to create a Topic in a program using kafka_2.10
Introduction of depend on
< the dependency > < groupId > org. Apache. Kafka < / groupId > < artifactId > kafka_2. 10 < / artifactId > < version > 0.8.2.2 < / version > </dependency>Copy the code
Creating a vm using an API
@Test public void testCreateTopic()throws Exception{ ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$) String topicName = "topic-kl"; int partitions = 1; int replication = 1; AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties()); }Copy the code
Note that the last constructor parameter of ZkClient is a serialization and deserialization interface implementation. The bloggers test if it is not completed, the creation of Topic on ZK data is a problem. The default Kafka implementation is very simple, just do utF-8 encoding. ZKStringSerializer$is an instance of an interface implemented in Kafka. It is a Scala companion object that can be obtained by calling the MODULE$in Java
Command creation
@Test public void testCreateTopic(){ String [] options= new String[]{ "--create"."--zookeeper"."127.0.0.1:2181"."--replication-factor"."3"."--partitions"."3"."--topic"."topic-kl" }; TopicCommand.main(options); }Copy the code
Explore the KafkaTemplate of message sending
Get send result
Asynchronous access
template.send(""."").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() { @Override public void onFailure(Throwable throwable) { ...... }
@Override public void onSuccess(SendResult<Object, Object> objectObjectSendResult) { .... }});Copy the code
Synchronization acquisition
ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl"."kl"); try { SendResult<Object,Object> result = future.get(); }catch (Throwable e){ e.printStackTrace(); }Copy the code
Kafka transaction messages
By default, spring-Kafka automatically generates instances of KafkaTemplate that do not have transaction message sending capabilities. The following configuration is required to activate the transaction feature. After the transaction is activated, all message sending can only be performed in the method where the transaction occurred, otherwise an exception with no transaction will be thrown
spring.kafka.producer.transaction-id-prefix=kafka_tx.Copy the code
When sending a message has a transaction requirement, for example, all messages are successful, as shown in the following example: If an exception occurs after the first consumption is sent and before the second message is sent, the first sent message will also be rolled back. And normally, assuming a period of sleep after the first message is sent, after the second message is sent, the consumer will receive the message only after the transaction method completes
@GetMapping("/send/{input}")public void sendFoo(@PathVariable String input) { template.executeInTransaction(t ->{ t.send("topic_input"."kl"); if("error".equals(input)){ throw new RuntimeException("failed"); } t.send("topic_input"."ckl"); return true; }); }Copy the code
Also, the @Transactional annotation ona method takes effect when the Transactional feature is enabled
@GetMapping("/send/{input}")@Transactional(rollbackFor = RuntimeException.class)public void sendFoo(@PathVariable String input) { template.send("topic_input"."kl"); if ("error".equals(input)) { throw new RuntimeException("failed"); } template.send("topic_input"."ckl"); }Copy the code
Spring-kafka’s transaction messaging is based on the transaction messaging functionality provided by Kafka. The default Kafka Broker configuration is set for three or more brokers with high availability services. In the test, a single Broker Kafka service was created using embedded services for simplicity and convenience, and some problems occurred
If the set of transaction log copies exceeds the number of brokers, the following exception will be thrown:
Number of alive brokers '1' does not meet the required replication factor '3' for the transactions state topic (configured via 'transaction.state.log.replication.factor').This error can be ignored if the cluster is starting up and not all brokers are up yet.Copy the code
The default configuration of Broker transaction. State. The replication. The factor = 3, single node can only be adjusted to 1
2. If the number of copies is smaller than the number of duplicate synchronization queues, the following exception is thrown
Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]Copy the code
The default configuration of Broker transaction. State. The min. Isr = 2, single node can only be adjusted to 1
ReplyingKafkaTemplate gets a message reply
ReplyingKafkaTemplate is a subclass of KafkaTemplate. In addition to inheriting the parent method, sendAndReceive is added to implement the message send reply semantics
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);Copy the code
So I send a message and I get the result that the consumer returns to me. Just like a traditional RPC interaction. This API is a good fit when the sender of a message needs to know the specific consumption of the message consumer. For example, sending a batch of data in a message requires knowing which data the consumer successfully processed. The following code shows how to integrate and use ReplyingKafkaTemplate
/** * @author: kl @kailing.pub * @date: 2019/5/30 */@SpringBootApplication@RestControllerpublic class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) { ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies"); repliesContainer.getContainerProperties().setGroupId("repliesGroup"); repliesContainer.setAutoStartup(false); return repliesContainer; }
@Bean public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) { return new ReplyingKafkaTemplate(pf, repliesContainer); }
@Bean public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate(pf); }
@Autowired private ReplyingKafkaTemplate template;
@GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record); ConsumerRecord<String, String> consumerRecord = replyFuture.get(); System.err.println("Return value: " + consumerRecord.value()); }
@KafkaListener(id = "webGroup", topics = "topic-kl") @SendTo public String listen(String input) { logger.info("input value: {}", input); return "successful"; }}Copy the code
Spring-kafka message consumption usage exploration
The use of the @ KafkaListener
The ability of @kafkalistener to receive messages has been demonstrated in simple integration. However, @kafkalistener has more functions than that. Other functions that are common and widely used are as follows:
-
Displays messages specifying which topics and partitions to consume,
-
Set the offset for each Topic and partition initialization,
-
Set the concurrency of the consuming thread
-
Set up the message exception handler
@KafkaListener(id = "webGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0"."1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) },concurrency = "6",errorHandler = "myErrorHandler")public String listen(String input) { logger.info("input value: {}", input); return "successful"; }Copy the code
Other annotations parameters are well understood, errorHandler to note, set this parameter to implement an interface KafkaListenerErrorHandler. And the configuration in the annotation is the Name of your custom implementation instance in the Spring context. For example, errorHandler = “myErrorHandler” is configured above. There should be an example of this in spring launch:
/** * @author: kl @kailing.pub * @date: 2019/5/31 */@Service("myErrorHandler")public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { Logger logger =LoggerFactory.getLogger(getClass()); @Override public Object handleError(Message<? > message, ListenerExecutionFailedException exception) { logger.info(message.getPayload().toString());returnnull; } @Override public Object handleError(Message<? > message, ListenerExecutionFailedException exception, Consumer<? ,? > consumer) { logger.info(message.getPayload().toString());return null; }}Copy the code
Manual Ack mode
Manual ACK mode in which the business logic controls the commit offset. For example, if the program has such semantics when consuming, you will have to use manual ACK mode if the ack is not acknowledged, i.e. the offset is not submitted. To enable manual, first turn off automatic submission and then set the consumption mode of consumer
spring.kafka.consumer.enable-auto-commit=falsespring.kafka.listener.ack-mode=manualCopy the code
This Acknowledgment should be used in the @kafkalistener listening method when consuming, and ack.acknowledge() means the offset has been submitted
@KafkaListener(id = "webGroup", topics = "topic-kl")public String listen(String input, Acknowledgment ack) { logger.info("input value: {}", input); if ("kl".equals(input)) { ack.acknowledge(); } return "successful"; }Copy the code
@kafkalistener annotation listener lifecycle
The listener lifecycle of the @kafkalistener annotation is controllable. By default, the @kafkalistener parameter autoStartup = “true”. Also is automatically start consumption, but also can with KafkaListenerEndpointRegistry to intervene in his life cycle. KafkaListenerEndpointRegistry has three action method, respectively, such as: start (), pause () and resume ()/start, stop, continue. The following code demonstrates this functionality in detail.
/** * @author: kl @kailing.pub * @date: 2019/5/30 */@SpringBootApplication@RestControllerpublic class Application { private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) { SpringApplication.run(Application.class, args); }
@Autowired private KafkaTemplate template;
@GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); template.send(record); }
@Autowired private KafkaListenerEndpointRegistry registry;
@GetMapping("/stop/{listenerID}") public void stop(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).pause(); } @GetMapping("/resume/{listenerID}") public void resume(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).resume(); } @GetMapping("/start/{listenerID}") public void start(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).start(); } @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false") public String listen(String input) { logger.info("input value: {}", input); return "successful"; }}Copy the code
In the above code, listenerID is the id value “webGroup” in @kafkalistener. Once the project is started, execute the following urls to see the results.
To send a message: http://localhost:8081/send/ckl. Because autoStartup = “false”, you do not see any messages coming into the listener.
Then start the listener: http://localhost:8081/start/webGroup. You can see a message coming in.
The effects of pausing and continuing purchases can be tested using similar methods.
SendTo Message forwarding
In addition to sending response semantics, the @sendto annotation can also take a parameter that specifies the Topic queue to be forwarded. A common scenario, for example, where a message needs to be processed multiple times and different processes consume different CPU resources, can be solved by consumers deployed across different topics and on different hosts. Such as:
@KafkaListener(id = "webGroup", topics = "topic-kl")@SendTo("topic-ckl")public String listen(String input) { logger.info("input value: {}", input); return input + "hello!"; } @KafkaListener(id ="webGroup1", topics = "topic-ckl")public void listen2(String input) { logger.info("input value: {}", input); }Copy the code
Application of message retries and dead-letter queues
In addition to the manual Ack mode mentioned above to control message offsets, spring-Kafka also encapsulates the semantics of retried consuming messages, which can be set to be retried when consuming data fails. And you can set the number of retries before a message is sent to a predetermined Topic. In the dead letter queue. The following code demonstrates this effect:
@Autowiredprivate KafkaTemplate template; @Beanpublic ConcurrentKafkaListenerContainerFactory<? ,? > kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); / / maximum retry three factory. SetErrorHandler (new SeekToCurrentErrorHandler (new DeadLetterPublishingRecoverer (template), 3));returnfactory; } @GetMapping("/send/{input}")public void sendFoo(@PathVariable String input) { template.send("topic-kl", input); } @KafkaListener(id ="webGroup", topics = "topic-kl")public String listen(String input) { logger.info("input value: {}", input); throw new RuntimeException("dlt"); } @KafkaListener(id ="dltGroup", topics = "topic-kl.DLT")public void dltListen(String input) { logger.info("Received from DLT: "+ input); }Copy the code
When topic-KL listens for a message, a runtime exception is raised, and the listener tries to call three times, when the maximum number of retries is reached. The message will then be discarded in the dead letter queue. The rule for topics in dead-letter queues is that the business Topic name + “.dlt “. If the name of the above business Topic is “topic-kl”, then the Topic of the corresponding dead-letter queue is “topic-kl.dlt”.
Conclusion at the end of the article
Recently, kafka has been used in business and spring-kafka has been used, so I systematically explored the various uses of Spring-kafka and found many interesting and cool features, such as, An annotation enables embedded Kafka services, send like RPC calls, response semantic calls, transaction messages, and more. Hopefully this blog post will help those of you who are using Spring-Kafka or will soon be using it take a few more twists and turns.