preface
Has recently been doing micro service development, involving the development of some of the Data processing module, each handle business development independent service, easy to expand and Flow behind the choreography, studied the SpringCloud Data Flow framework, such as feeling this framework is too heavy for us, and more trouble, to maintain and arranged according to the Flow of ideas, Simple flow choreography based on our current technology stack.
To put it simply, we want our flow choreography to be microservice pluggable and microservice data entry and output to be modified without shutdown.Copy the code
The preparatory work
Introduction to Nacos installation and use
If you want to learn by yourself, you are recommended to use docker installation
- Docker pull nacOS/nacOS-server
- Create a service docker run –env MODE=standalone –name nacos -d p 8848.8848 nacos/nacos-server
Then enter the IP address :8848/nacos Account nacos Password nacos in the browser
Docker helps us quickly install the service and reduce the time spent preparing the environmentCopy the code
Prepare three SpringBoot services and introduce Nacos and Kafka
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0. RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.1</version>
</dependency>
Copy the code
The configuration file
spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group Node1 node2 node3
enable-auto-commit: false
# Deployed NACOS service
nacos:
config:
server-addr: nacos-server:8848
Copy the code
It is recommended that you enter xxx-server instead of the service IP address when setting the local hostCopy the code
Business interpretation
Now we need to arrange the three services to ensure that each service can be plugged and unplugged, and the position of the service can be adjusted as follows:
- The node1 service listens for the data stream sent by the front-end service, and the input topic is the front-end data service output topic
- Node2 listens to the data processed by Node1, so node2 listens to the topic output by Node1. Similarly, Node3 sends data to the end of the data stream after processing
- Now we need to adjust the process to remove node2-server, we only need to change node1-sink to node2-sink, so that these services can be flexibly embedded in the data flow processing business of different projects, so that plug and play can be done (of course, the data format at the business level needs to be agreed).
- Dynamic tunability also ensures that when a problem occurs at a node of the service, the data flow can be changed immediately, such as to a temporary data service, to avoid the accumulation of too much data in Kafka and unbalanced throughput
Nacos configuration
Create a configuration
Generally, each service in the flow orchestration has one input and one output, namely input and sink. Therefore, we need to configure two topics for each service, namely input-topic output-topic. We add the input-output configuration in nacOS
GroupId and dataId are required for nacOS configuration items. Usually, the service name is used as groupId and the name of the configuration item is used as dataId. For example, node1-server service has an input configuration item as follows:Copy the code
Configure one service and the other services by referring to the following figureCopy the code
Reading configuration
@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
// Autochecksum =true means that the configuration in NACOS is refreshed when it changes, and false means that only the values read when the service is started are used
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {
@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;
@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;
public String getInput(a) {
return input;
}
public String getSink(a) {
returnsink; }}Copy the code
Listening for configuration changes
The input of the service needs to create the consumer when the service is started, re-create the consumer when the topic changes, and remove the consumer of the old topic. The output is business-driven, no need to listen for changes, and the latest configuration of the topic is read every time the service is sent. Because the Autochecksum = true in the configuration class above, this will only refresh the configuration values in nacosConfig. The service needs to know about configuration changes to drive the consumed creation business, and the NACOS configuration listener needs to be created
/** * Listen for Nacos configuration changes, create consumers, update consumption */
@Component
public class ConsumerManager {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.group-id}")
private boolean groupId;
@Autowired
private NacosConfig nacosConfig;
@Autowired
private KafkaTemplate kafkaTemplate;
// Used to store topics used by current consumers
private String topic;
// Used to execute the consumer thread
private ExecutorService executorService;
/** * listen for input */
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
// This listener is triggered when the actual input value in NacosConfig is already the latest value. We just need this listener to trigger us to update the consumer business
String inputTopic = nacosConfig.getInput();
// The reason I'm using nacosConfig is because I'm listening for input= XXXX instead of XXXX. If I need to use it myself, the content framework in nacosConfig will handle it
// Check whether the local topic variable has a value. If it has a value, it is the update consumer. If it has no value, it needs to be created
if(topic ! =null) {
// Stop the old consumer thread
executorService.shutdownNow();
executorService == null;
}
// Create consumers based on the new topic
topic = inputTopic;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2), threadFactory);
// Execute consumer business
executorService.execute(() -> consumer(topic));
}
/**
* 创建消费者
*/
public void consumer(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", servers);
properties.put("enable.auto.commit", enableAutoCommit);
properties.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
while(! Thread.currentThread().isInterrupted()) { Duration duration = Duration.ofSeconds(1L);
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
// Perform data processing without business implementation
String handleMessage = handle(message);
// After processing, send to the next nodekafkaTemplate.send(nacosConfig.getSink(), handleMessage); } } consumer.commitAsync(); }}catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
try {
consumer.commitSync();
} finally{ consumer.close(); }}}}Copy the code
conclusion
Flow arrangement of ideas is the data flow direction can be adjusted as a whole, the us to demand, according to some mainstream framework provided by the API to achieve their own dynamic adjustment scheme, can help you better understand the flow coding idea and principle, in the actual business, there are many business problems need to break through, so that we deal with more pluggable because service, I am working in a traditional company now. For some reasons, it is difficult to promote the use of the new framework, so I often use some combinations of existing technology stacks to do some SAO operations for your reference. I hope you can give me more advice.