Spring Cloud Stream integration kafka with Spring Cloud Stream But in fact, the project is not a simple technology will forever or not, in the actual development, we will encounter a lot of the details (pit), this article, will take some of the small point, an example is used to tell you, the complexity of the project, often reflected in the actual cumbersome steps.

1. Group configuration

In the configuration of sending messages, the group is not configured


Proof of this can be found in the comments of the source code

org.springframework.cloud.stream.config.BindingProperties


2. Modify topic Partitions

The configuration file is as follows

Bindings: output: binder: kafka destination: wPH-d-2 # Message destination, corresponding topic content: text/plain # message format partitionCount: 7Copy the code

PartitionCount is used to set the number of partitions. The default value is 1

Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 7, but 5 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions` at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProv Isioner. Java: 384) ~ [spring - cloud - stream - binder - kafka - core - 3.0.0. M4. The jar: 3.0.0. M4] the at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvis Ioner. Java: 325) ~ [spring - cloud - stream - binder - kafka - core - 3.0.0. M4. The jar: 3.0.0. M4] the at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java: (302) ~ [spring - cloud - stream - binder - kafka - core - 3.0.0. M4. The jar: 3.0.0. M4]... 14 common frames omittedCopy the code

Add autoAddPartitions as prompted

Kafka: Binder: Brokers: #Kafka message-ware server address - localhost:9092 autoAddPartitions: trueCopy the code

When you start again, you can see that the number of partitions has changed


AutoAddPartitions corresponding class attribute is org. Springframework. Cloud. Stream. The binder. Kafka. Properties. KafkaBinderConfigurationProperties

Set properties partitionCount class is org. Springframework. Cloud. Stream. Binder. ProducerProperties

3. An error occurs when sending JSON

Error sending sendMessage/complexType using postman


The error message on the server side is:

Resolved [org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'text/plain;charset=UTF-8' not supported]Copy the code

The reason is that the data transmission format is wrong. We need to change the format of the data sent by Postman


Then you can send happy


4. Send json correctly and convert it to an object

If we need to send json information, we need to set the content-type to JSON on the sending side.

Bindings: Output: binder: kafka destination: WPH-d-2 # Message destination, corresponding topic content-type: application/json # message formatCopy the code

The message is then sent via the producer

@RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST)
	public String publishMessageComplextType(@RequestBody ChatMessage payload) {
		logger.info(payload.toString());
		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "chatMessage").build());
		return "success";
	}Copy the code

The ChatMessage field name must have getters and settr methods, otherwise the field name will not receive a value when the JSON is converted to an object.

When subscribing to messages, the content-type in application.yml can be set to “application/json” by default. This can be in the org. Springframework. Cloud. Stream. Config. BindingProperties class comments inside to see


As above, the ChatMessage field name needs to have a getter or setter method, either of which will do.

To receive json and convert it to a class, do the following:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chatMessage'") public void handle(ChatMessage message) { logger.info(message.toString());  }Copy the code

There is a pit warning: If we set the content-type of the sending message to Text /plain and the content-type of the message subscriber to Application /json, this error will be reported to the message subscriber

Caused by: java.lang.IllegalStateException: argument type mismatch
Endpoint [com.wphmoon.kscsclient.Consumer]Copy the code

If the content-type is set to Application/JSON on the sending side and Text /plain on the subscriber side, I can receive messages and convert them to ChatMessage objects without any problems.

The source code