Spring Cloud Stream new binding method
New spring Cloud Stream documentation
The new version advocates sending and consuming information functionally
Define beans with return type Supplier, Function, or Consumer to provide beans that send and consume messages
Take a look at the binding name naming convention
- input – + -in- +
- output – + -out- +
Specified in the configuration file spring. Cloud. The function. The definition after the name of the bean is bound to a corresponding to the consumers and providers.
The following definition binds the bean to either the consumerEvent-in-0 or the provider ConsumerEvent-out 0
Multiple beans can be used; segmentation
spring:
cloud:
function:
definition: consumerEvent
Copy the code
Specify the topic and group of the consumer
spring:
cloud:
stream:
bindings:
consumerEvent-in-0:
destination: DEMO
group: demo-group
Copy the code
Register a consumer’s bean
// The first way (official recommendation)
@Bean
public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
return flux -> flux.map(message -> {
System.out.println(message.getPayload());
return message;
}).then();
}
// The second way
// Note that with Flux, subscribe is called otherwise the method will not be consumed
@Bean
public Consumer<Flux<Message<String>>> consumerEvent() {
return flux -> flux.map(message -> {
System.out.println(message.getPayload());
return message;
}).subscribe();
}
/ / or
@Bean
public Consumer<Message<String>> consumerEvent() {
return message -> System.out.println(message.getPayload());
}
Copy the code
The sample
The provider
@Configuration
public class EventSender {
@Bean
public Demo demo(a) {
return new Demo();
}
static class Demo implements CommandLineRunner {
@Autowired
StreamBridge streamBridge;
@Override
public void run(String... args) throws Exception {
final Message<T> message = MessageBuilder.withPayload("Body")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
// The first configuration is the destination
// If there is a configuration in YAML, it will be sent to the yamL destination
streamBridge.send("DEMO", message); }}}Copy the code
Configure rocketMQ and Stream configurations
spring:
application:
name: demo
cloud:
stream:
rocketmq:
binder:
name-server: 127.0. 01.: 9876
group: demo
bindings:
consumerEvent-in-0:
destination: DEMO
content-type: application/json
group: demo-group
function:
definition: consumerEvent
Copy the code
Register a consumer
@Configuration
public class EventReceptor {
@Bean
public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
return flux -> flux.map(message -> {
System.out.println(message.getPayload());
returnmessage; }).then(); }}Copy the code
Rely on
Spring Cloud 2020 does not use bootstrap by default, but instead relies on spring-cloud-starter-bootstrap
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.2</version> <type> POm </type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.5- Rocketmq-rc1 </version> <type> POm </type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> The < version > 3.0.2 < / version > < / dependency >Copy the code
The Tag to filter
The filter tag has been disabled in the new version, and the new SQL and tag are incorporated into the subscription property
this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getSubscription()));
public static MessageSelector getMessageSelector(String expression) {
return StringUtils.hasText(expression) && expression.startsWith("sql:")? MessageSelector.bySql(expression.replaceFirst("sql:"."")) : MessageSelector.byTag(expression);
}
Copy the code
If the consumer wants to filter a tag, it says this
// A new version of (Current writing)
rocketmq:
bindings:
createUserAccountEvent-in-0:
consumer:
subscription: DEMO-TAG
// The old version (Previously written)
rocketmq:
bindings:
createUserAccountEvent-in-0:
consumer:
tag: DEMO-TAG
Copy the code