Spring Cloud Stream new binding method

New spring Cloud Stream documentation

The new version advocates sending and consuming information functionally

Define beans with return type Supplier, Function, or Consumer to provide beans that send and consume messages

Take a look at the binding name naming convention

  • input – + -in- +
  • output – + -out- +

Specified in the configuration file spring. Cloud. The function. The definition after the name of the bean is bound to a corresponding to the consumers and providers.

The following definition binds the bean to either the consumerEvent-in-0 or the provider ConsumerEvent-out 0

Multiple beans can be used; segmentation

spring:
  cloud:
    function:
      definition: consumerEvent
Copy the code

Specify the topic and group of the consumer

spring:
  cloud:
    stream:
      bindings:
        consumerEvent-in-0:
          destination: DEMO
          group: demo-group
Copy the code

Register a consumer’s bean

// The first way (official recommendation)
@Bean
public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
    return flux -> flux.map(message -> {
        System.out.println(message.getPayload());
        return message;
    }).then();
}

// The second way
// Note that with Flux, subscribe is called otherwise the method will not be consumed
@Bean
public Consumer<Flux<Message<String>>> consumerEvent() {
    return flux -> flux.map(message -> {
        System.out.println(message.getPayload());
        return message;
    }).subscribe();
}
/ / or
@Bean
public Consumer<Message<String>> consumerEvent() {
    return message -> System.out.println(message.getPayload());
}
Copy the code

The sample

The provider

@Configuration
public class EventSender {
    @Bean
    public Demo demo(a) {
        return new Demo();
    }

    static class Demo implements CommandLineRunner {
        @Autowired
        StreamBridge streamBridge;

        @Override
        public void run(String... args) throws Exception {
            final Message<T> message = MessageBuilder.withPayload("Body")
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();

            // The first configuration is the destination
            // If there is a configuration in YAML, it will be sent to the yamL destination
            streamBridge.send("DEMO", message); }}}Copy the code

Configure rocketMQ and Stream configurations

spring:
  application:
    name: demo
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0. 01.: 9876
          group: demo
      bindings:
        consumerEvent-in-0:
          destination: DEMO
          content-type: application/json
          group: demo-group
      function:
        definition: consumerEvent
Copy the code

Register a consumer

@Configuration
public class EventReceptor {
    @Bean
    public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
        return flux -> flux.map(message -> {
            System.out.println(message.getPayload());
            returnmessage; }).then(); }}Copy the code

Rely on

Spring Cloud 2020 does not use bootstrap by default, but instead relies on spring-cloud-starter-bootstrap

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2020.0.2</version> <type> POm </type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.5- Rocketmq-rc1 </version> <type> POm </type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> The < version > 3.0.2 < / version > < / dependency >Copy the code

The Tag to filter

The filter tag has been disabled in the new version, and the new SQL and tag are incorporated into the subscription property

this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getSubscription()));

public static MessageSelector getMessageSelector(String expression) {
    return StringUtils.hasText(expression) && expression.startsWith("sql:")? MessageSelector.bySql(expression.replaceFirst("sql:"."")) : MessageSelector.byTag(expression);
}
Copy the code

If the consumer wants to filter a tag, it says this

// A new version of (Current writing)
rocketmq:
  bindings:
    createUserAccountEvent-in-0:
      consumer:
        subscription: DEMO-TAG

// The old version (Previously written)
rocketmq:
  bindings:
    createUserAccountEvent-in-0:
      consumer:
        tag: DEMO-TAG
Copy the code