In today’s increasingly complex applications, message-oriented middleware is essential, and different businesses may choose different message-oriented middleware, such as RabbitMQ or Kafka. Different middleware has different logical structures. Switching from RabbitMQ to Kafka can be a disastrous migration, so to solve this problem, Spring Cloud Stream helps us to block the differences in messaging middleware and implement messaging.
1. Introduction
Spring Cloud Stream provides inputs, outputs, and binders to implement interactions between applications, where output is the message sender and input is the message receiver, bound by the binders, and then connected through channels. Developers do not need to pay attention to the specific messaging middleware used, only to implement the corresponding interface.
2. Install the RabbitMQ
In the author’s another article [the RabbitMQ and Kafka: Win10 installation tutorial] (www.jianshu.com/p/63d32ab93…
After RabbitMQ is successfully started, enter http://localhost:15672/#/ in the browser. If the following page is displayed, RabbitMQ is successfully installed and started.
3. Set up stream – producer – 8700
The new sub-project springCloud-Stream-Producer-8700 (stream-producer-8700 for short) has the following structure:
3.1 Pom.xml introduces dependencies
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloudtest</artifactId>
<groupId>com.elio.springcloud</groupId>
<version>1.0 the SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springcloud-stream-producer-8700</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<! -- Hot start plugin -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<addResources>true</addResources>
</configuration>
</plugin>
</plugins>
</build>
</project>
Copy the code
3.2 Adding the application.yml configuration file
Where cloud.stream.bindings is the output channel we want to bind, where myOutput is custom
server:
port: 8700 # port
spring:
application:
name: springcloud-stream-producer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
myOutput:
destination: stream-test
eureka:
instance:
instance-id: ${spring.application.name}:${server.port}
client:
fetch-registry: true
register-with-eureka: true
service-url:
defaultZone: http://localhost:8300/eureka/,http://localhost:8301/eureka/
Copy the code
3.3 New StreamProducer8700 primary startup class
Is there any special comment on the main launcher class
package com.elio.springcloud;
import com.elio.springcloud.service.MySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableDiscoveryClient
public class StreamProducer8700 {
public static void main(String[] args){ SpringApplication.run(StreamProducer8700.class, args); }}Copy the code
3.4 Added the MySource personalization interface
In fact, there is a standard Source interface, but can be customized, because the actual production is also more customized.
package com.elio.springcloud.service;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public interface MySource {
@Output("myOutput")
MessageChannel myOutput(a);
}
Copy the code
3.5 Adding SendService
This service class will be called by the Controller to send the message
package com.elio.springcloud.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
@EnableBinding({MySource.class})
public class SendService {
@Autowired
private MySource mysource;
public void sendMsg(String msg){ mysource.myOutput().send(MessageBuilder.withPayload(msg).build()); }}Copy the code
3.6 New StreamProducerController class
package com.elio.springcloud.controller;
import com.elio.springcloud.service.SendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class StreamProducerController {
@Autowired
private SendService sendService;
@GetMapping(value="/send/{msg}")
public void send(@PathVariable("msg") String msg){ sendService.sendMsg(msg); }}Copy the code
4. Set up stream – consumer – 8800
Added springcloud-stream-consumer-8800 (stream-consumer-8800 for short) as the message receiver.
4.1 Modifying the import dependencies of POM. XML
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloudtest</artifactId>
<groupId>com.elio.springcloud</groupId>
<version>1.0 the SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springcloud-stream-consumer-8800</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<! -- Hot start plugin -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<addResources>true</addResources>
</configuration>
</plugin>
</plugins>
</build>
</project>
Copy the code
4.2 Adding the Application. yml configuration file
Where myInput is a custom input
server:
port: 8800 # port
spring:
application:
name: springcloud-stream-consumer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
myInput:
destination: stream-test
eureka:
instance:
instance-id: ${spring.application.name}:${server.port}
client:
fetch-registry: true
register-with-eureka: true
service-url:
defaultZone: http://localhost:8300/eureka/,http://localhost:8301/eureka/
Copy the code
4.3 StreamConsumer8800 main startup class added
package com.elio.springcloud;
import com.elio.springcloud.message.MySink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableDiscoveryClient
public class StreamConsumer8800 {
public static void main(String[] args){ SpringApplication.run(StreamConsumer8800.class, args); }}Copy the code
4.4 New MySink class
package com.elio.springcloud.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
@Input("myInput")
SubscribableChannel myInput(a);
}
Copy the code
4.5 New MySink class
package com.elio.springcloud.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
@Input("myInput")
SubscribableChannel myInput(a);
}
Copy the code
4.6 Added the ReceiveService class
package com.elio.springcloud.message;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
@Service
@EnableBinding(MySink.class)
public class ReceiveService {
@StreamListener("myInput")
public void recieve(Object payload){ System.out.println(payload); }}Copy the code
5. Test
Start eureka-server-8300, Eureka-server-8301, stream-producer-8700, and stream-consumer-8800 in sequence. After the startup, access the stream-producer-8700 sending interface
http://localhost:8700/send/hello%20world%20stream
It then looks at the stream-Consumer-8800 console and finds that the consumer has successfully received the message.
6. Summary
In this article, we simply implement the spender and spender of messages via Spring Cloud Stream. During the configuration process, we do not interact with RabbitMQ, but continue to interact with input, output, and binder, thus masking the details of the underlying messaging middleware. Achieve the effect of decoupling.