Spring Cloud Stream integration kafka with Spring Cloud Stream But in fact, the project is not a simple technology will forever or not, in the actual development, we will encounter a lot of the details (pit), this article, will take some of the small point, an example is used to tell you, the complexity of the project, often reflected in the actual cumbersome steps.
1. Group configuration
In the configuration of sending messages, the group is not configured
Proof of this can be found in the comments of the source code
org.springframework.cloud.stream.config.BindingProperties
2. Modify topic Partitions
The configuration file is as follows
bindings:
output:
binder: kafka
destination: wph-d2 -# The destination to which the message is sent
content-type: text/plain The format of the message
producer:
partitionCount: 7Copy the code
PartitionCount is used to set the number of partitions. The default value is 1
Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 7, but 5 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions` at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProv Isioner. Java: 384) ~ [spring - cloud - stream - binder - kafka - core - 3.0.0. M4. The jar: 3.0.0. M4] the at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvis Ioner. Java: 325) ~ [spring - cloud - stream - binder - kafka - core - 3.0.0. M4. The jar: 3.0.0. M4] the at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java: (302) ~ [spring - cloud - stream - binder - kafka - core - 3.0.0. M4. The jar: 3.0.0. M4]... 14 common frames omittedCopy the code
Add autoAddPartitions as prompted
kafka:
binder:
brokers: Kafka message-oriented server address
- localhost:9092
autoAddPartitions: trueCopy the code
When you start again, you can see that the number of partitions has changed
AutoAddPartitions corresponding class attribute is org. Springframework. Cloud. Stream. The binder. Kafka. Properties. KafkaBinderConfigurationProperties
Set properties partitionCount class is org. Springframework. Cloud. Stream. Binder. ProducerProperties
3. An error occurs when sending JSON
Error sending sendMessage/complexType using postman
The error message on the server side is:
Resolved [org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'text/plain; charset=UTF-8' not supported]Copy the code
The reason is that the data transmission format is wrong. We need to change the format of the data sent by Postman
Then you can send happy
4. Send json correctly and convert it to an object
If we need to send json information, we need to set the content-type to JSON on the sending side.
bindings:
output:
binder: kafka
destination: wph-d2 -# The destination to which the message is sent
content-type: application/json The format of the messageCopy the code
The message is then sent via the producer
@RequestMapping(value = "/sendMessage/complexType". method = RequestMethod.POST) public String publishMessageComplextType(@RequestBody ChatMessage payload) { logger.info(payload.toString()); producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type"."chatMessage").build());
return "success";
}Copy the code
The ChatMessage field name must have getters and settr methods, otherwise the field name will not receive a value when the JSON is converted to an object.
When subscribing to messages, the content-type in application.yml can be set to “application/json” by default. This can be in the org. Springframework. Cloud. Stream. Config. BindingProperties class comments inside to see
As above, the ChatMessage field name needs to have a getter or setter method, either of which will do.
To receive json and convert it to a class, do the following:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chatMessage'")
public void handle(ChatMessage message) {
logger.info(message.toString());
}Copy the code
There is a pit warning: If we set the content-type of the sending message to Text /plain and the content-type of the message subscriber to Application /json, this error will be reported to the message subscriber
Caused by: java.lang.IllegalStateException: argument type mismatch
Endpoint [com.wphmoon.kscsclient.Consumer]Copy the code
If the content-type is set to Application/JSON on the sending side and Text /plain on the subscriber side, I can receive messages and convert them to ChatMessage objects without any problems.
The source code