My blog: programmer xiaoxiao sheng, welcome to browse blog!

In SpringCloud Stream consumer Grouping, we configure consumer grouping to implement the function that messages are received by only one consumer within the same group. This chapter describes the functionality of message partitioning.

preface

In Spring Cloud Stream is easy to make a single application to connect to the message middleware, but more often is a multiple instances of the application, in the actual application scenario, we need to put the same types of messages, such as the same user, or the same type of log messages from the same consumer spending and always do the statistics, But messages are scattered across different instances, which makes it difficult. At this point you can use message partitioning.

Instance Index and Instance Count

When we deploy the same application in cluster mode, each instance can receive the number of instances of the same application, and the index of its current instance in the cluster. Stream through spring. Cloud. Stream. InstanceCount instance number and spring. Cloud. Stream. InstanceIndex current index to achieve this. If the total number of instances instanceCount is 3, then the instanceIndex index starts from 0 to 1 and 2. The correct configuration of these two attributes is very important for resolving partitioning behavior, and can be used to ensure that messages are properly split between multiple instances.

Unpartitioned tests

2.1 Producer Configuration

Sending message entity class:

public class Message implements Serializable {
    
    private int id;
    private String body;
    public Message() {
    }
    public Message(int id, String body) {
        this.id = id;
        this.body = body;
    }
Copy the code

Create a channel interface that sends messages

public interface SenderSource {
    String OUTPUT = "input";

    @Output(SenderSource.OUTPUT)
    MessageChannel output();

}Copy the code

Test classes for sending messages:

@RunWith(SpringRunner.class) @SpringBootTest(classes=SenderApplication.class) public class MessageTest { @Autowired SenderSource source; @Test public void sender() { Message message = new Message(1, "test send"); for (int i = 0; i < 10; i++) { source.output().send(MessageBuilder.withPayload(message).build()); }}}Copy the code

Applicaiton. Yml configuration:

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: topic
            binder: rabbit1
      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672Copy the code

2.2 Consumer Configuration

Create a Message entity:

public class Message implements Serializable {
    private int id;
    private String body;
    public Message() {
    }
    public Message(int id, String body) {
        this.id = id;
        this.body = body;
    }
Copy the code

Receive a message

@SpringBootApplication @EnableBinding(Sink.class) @EnableEurekaClient public class ReceiverApplication { public static void main(String[] args) { SpringApplication.run(ReceiverApplication.class, args); } @Value("${server.port}") private String port; /** * Listen for rabbitMQ messages, which queue, which topic, * * @streamListener (sink. INPUT) public void reader(Message MSG) { System.out.println("receiver port:"+port +",message:"+ msg); }}Copy the code

Consumer application.xml configuration

spring: cloud: stream: bindings: input: destination: topic binder: rabbit1 group: group1 binders: rabbit1: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 server: port: 8081 eureka: instance: hostname: Eureka7001.com # Instance name of the eureka server instance-id: receiver1 Client: service-url: http://eureka7001.com:7001/eureka/ enabled: trueCopy the code

You can change the port number and eureka. Instance-id to start multiple instances and eureka at the same time. We then send the message through the message producer:

We see that the same message is spread across different instances

Example 1:

Example 2:

Add a partition

2.1 Configure producers first

Payload Specifies the expression of the partition key. Payload indicates that the partition value is calculated using the hash value after the message is obtained

spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload

Number of message partitions

spring.cloud.stream.bindings.input.producer.partitionCount=2

spring:
  cloud:
    stream:
      bindings:
         input:
            producer:
               partitionKeyExpression: payload
               partitionCount: 2
            destination: topic
            binder: rabbit1

      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672Copy the code

2.2 Configuring consumers:

Enable partition

spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true

Specifies the total number of instances of the current consumer; the current instance is 2

spring.cloud.stream.instanceCount=2

Sets the index number of the current instance, starting from 0

spring.cloud.stream.instanceIndex=0

The instanceIndex of different instances is configured as required

spring:
  cloud:
    stream:
      bindings:
         input:
           consumer:
               partitioned: true
            destination: topic
            binder: rabbit1
            group: group1
      binders:
         rabbit1:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
     instance-count: 2
     instance-index: 0Copy the code

After the modification is complete, restart all instances and send messages via the producer, and we see that all messages are sent to one instance:

So we have the same type of message sent to the same instance.

3. Customize a partition policy

In the presentation of the above, we use spring cloud. Stream. Bindings. Input. Producer. PartitionKeyExpression = news content to achieve the partition, what do you mean, when we are configured with this property, The current instance payload represents the message defined by us. By default, Spring Cloud Stream calculates the message using the hash method:

key.hashCode() % partitionCount

Figure out which partition the message is in, and with that in mind, we can do much more. In Spring Cloud in the Stream provides the implementation org. Springframework. Cloud. Stream. Binder. The PartitionKeyExtractorStrategy interface custom partition strategy, we can implement the following simple, Same message, we partition randomly, but make sure the partition partition value is in the range of the number of instances, such as I have 2 consumer instances, then the randomly generated partition value is between 0 and 1:

Create MyPartitionKeyStrategy class implements PartitionKeyExtractorStrategy interface:

import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.messaging.Message;

import java.util.Random;

public class MyPartitionKeyStrategy implements PartitionKeyExtractorStrategy {
    @Override
    public Object extractKey(Message<?> message) {

        com.microservice.stream.controller.Message sendMsg = (com.microservice.stream.controller.Message) message.getPayload();

        Random random =new Random();

        final int r = random.nextInt(2);

        System.out.println("r:" + r);

        return r;
    }
}
Copy the code

Inject the current partitioning policy by configuring the class method:

@Configuration public class StrategyConf { @Bean MyPartitionKeyStrategy myPartitionKeyStrategy() { return new MyPartitionKeyStrategy(); }}Copy the code

Change the producer’s YML configuration and change the partitionKeyExpression to the method name of the injected class:

Spring: Cloud: stream: Bindings: Input: producer: bindings: Input: producer myPartitionKeyStrategy partitionCount: 2 destination: topic binder: rabbit1 binders: rabbit1: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672Copy the code

Instead of sending messages through producers, messages are sent randomly to different instances:

Example 1:

Example 2:

In a real enterprise environment, we can implement our own partitioning strategy based on our own needs.

conclusion

This chapter learned how to use partitioning in Spring Cloud Stream by testing without partitioning, setting up partitioning, and customizing partitioning policies. In the actual development will inevitably encounter such requirements. It is also important to master and be able to use it.

—-END—-

In order to share this issue, you can also pay attention to the public number: programmer Xiaoxiao sheng, pay attention to more wonderful content!

SpringCloud Basics tutorial part 1 – Microservices and SpringCloud

SpringCloud basics tutorial ii – service discovery Eureka

SpringCloud basics tutorial (3)-Eureka advanced

SpringCloud Basics tutorial (4)- Getting started with configuration Center

SpringCloud basics tutorial (5)- configuring central hot availability and high availability

SpringCloud basics tutorial (6)- load balancing Ribbon

SpringCloud Basics tutorial (7)-Feign Declarative Service Invocation

Hystrix Fuse (Part 1)

Hystrix Service Monitoring (part 2)

SpringCloud basics tutorial (10)-Zull service gateway

SpringCloud Basics tutorial (11)- Sleuth Call chain Tracing Introduction

Build Zipkin distributed link tracking system

SpringCloud Stream【Greenwich.SR3】

SpringCloud Advanced: SpringCloud Stream core components

SpringCloud Advanced: Message-driven SpringCloud Stream consumer grouping

SpringCloud Advanced: A message-driven SpringCloud Stream message partition

Look forward to more exciting content…

This article is published by OpenWrite!