Recently, I am working on the integration of SOFA and SpringCloud. I hope to help people better use SOFA and SpringCloud through a series of DEMO projects. At the same time, we also hope that you can participate in the co-construction and star.
GitHub portal: Spring-Cloud-Sofastack-samples
Kafka profile
The official website: https://kafka.apache.org/
provides
Apache Kafka™ is a distributed data flow platform.
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. Publish and subscribe data flows, much like message queues or enterprise-level messaging systems.
- Store Streams of records in a fault-tolerant durable way. Storage data flows with strong disaster recovery
- Process streams of records as they occur. Process data flow in a timely manner.
As a back-end driver, Kafka is mostly used as a distributed message queue. Distributed message queue can provide application decoupling, traffic peak elimination, message distribution and other functions, which has been an indispensable basic setup of large Internet service architecture.
The basic concept
Topic and partition
The core abstraction that Kafka provides for data, topic is the category or name of the published data flow. Topic in Kafka, supports multiple subscribers; That is, a topic can have zero, one, or more consumers subscribing to data written to that topic. For each topic, the Kafka cluster maintains a log of partitions like this:
Partitions in logs serve several purposes:
- The logs are extensible and the size of a topic is not limited by the size of a single server. Each individual Partition must be smaller than the disk on the server hosting it, but a topic can have many partitions, so it can handle any amount of massive data.
- Unit of parallel processing (zhihu-partition: Kafka can divide a topic into partitions and choose which Partition to store messages in based on the partitioning rules. As long as the partitioning rules are set properly, all messages will be evenly distributed into different partitions, thus implementing load balancing and horizontal scaling. In addition, multiple subscribers can consume data from one or more partitions simultaneously to support massive data processing power.)
Why are topics in Kafka partitioned
Kafka is a topic that can be partitioned. Kafka is a topic that can be partitioned
producers
Producers publish data to topics of their choice, and producers are responsible for choosing which Partition in the topic to assign the data to. This can be done by simply balancing the load in a round-robin fashion, or it can be partitioned according to some semantics, such as based on certain keywords in the data.
consumers
Consumers identify themselves with a consumer group name. Several consumers share a group, and each data published to a topic is passed to a consumer instance within each consumer group. The consumer instance can be in a different process or on a different machine.
If all consumer instances have the same Consumer group, the record will be effectively load-balanced across all consumer instances
If all consumer instances have different consumer groups, then each record will be broadcast to all consumer processes, and each data will be sent to all consumers.
Kafka official documentation:
As shown in the figure above, a Kafka cluster with two server nodes hosts four partitions (P0-P3) divided into two consumer groups. Consumer group A has two consumer instances and consumer group B has four. More often, however, we find themes with a small number of consumer groups, each representing a “logical subscriber.” Each group consists of a number of consumer instances, ensuring extensibility and fault tolerance. This is publish-subscribe semantics, but users are a set of consumers rather than a single process. Consumption is achieved in Kafka by splitting partitions in the log evenly among consumer instances so that each instance is the sole consumer of a “proportionately sized piece” of the partition at any one time. The process of maintaining consumer group membership is handled dynamically by the Kafka protocol. If new instances join the group, they will take over some partitions from other members of the group; If an instance disappears, its partitions are distributed to the remaining instances. Kafka only provides the order of records within a single partition, not the total order between different partitions in a topic. Each partition sort, combined with key partitions, is sufficient for most applications. However, if you need to use the total order, you can do so with topics that have only one partition, although this only means one consumer process per consumer group.
Kafka as a messaging system
Messaging systems traditionally have two modes: queue and publish-subscribe.
- In the queue mode, the pool of consumers can be read from the server, and each record is consumed by only one consumer
- Allows data processing to be allocated across multiple consumer instances, but once the data is consumed, it is gone
- In the publish-subscribe model, records are broadcast to all consumers
- Allows data to be broadcast to multiple processes, but cannot be scaled or expanded because each message is sent to each subscriber
This article only introduces some of the basic concepts of Kafka as a message queue. For more information, please refer to the official documentation.
Kafka installation
Here, look at how to install kafka, download address: https://kafka.apache.org/downloads. The version used in this article is kafka_2.12-1.1.1.
-
Obtaining package files
> wget HTTP: / / http://mirrors.shu.edu.cn/apache/kafka/1.1.1/kafka_2.12-1.1.1.tgzCopy the code
-
Decompression package
> the tar - ZXVF kafka_2. 12-1.1.1. TGZCopy the code
-
Modifying a Configuration File
> cdKafka_2. 12-1.1.1 / config > vim server propertiesCopy the code
The main modification items here include the following:
# The id of the broker. This must be set to a unique integer for each broker.Broker. Id = 0 listeners = PLAINTEXT: / / 192.168.0.1:9092 advertised. Listeners = PLAINTEXT: / / 192.168.0.1:9092# zookeeper addressZookeeper. Connect = 192.168.0.6:2181Copy the code
The Kafka service depends on Zookeeper to start. Therefore, specify the Zookeeper cluster address in the configuration file. You can use the following methods to start a single Zookeeper instance:
> sh zookeeper-server-start.sh -daemon config/zookeeper.properties Copy the code
Here I am specifying a previously deployed ZK machine, so I can refer the ZK address directly to the already deployed address. For details about how to install Zookeeper, see Installing Zookeeper on a Linux VM
Kafka: Kafka: Kafka: Kafka: Kafka
> sh kafka-server-start.sh config/server.properties Copy the code
SpringBoot integration Kafka
Build a simple Kafka Producer tool dependency
- Depend on the introduction of
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5. RELEASE</version><! --$NO-MVN-MAN-VER$-->
</dependency>
Copy the code
- producer
To make Kafka available to other modules, you can wrap Kafka’s production utility classes using SpringBoot’s auto-configuration mechanism, as follows:
@Configuration
public class KafkaProducerAutoConfiguration {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Bean
public KafkaSender kafkaSender(a){
return newKafkaSender(kafkaTemplate); }}Copy the code
- KafkaSender
public class KafkaSender {
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/** * send message */
public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); }}Copy the code
- Automatic configuration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.sofastack.cloud.core.kafka.configuration.KafkaProducerAutoConfiguration
Copy the code
The engineering module is as follows: image-20190306151759441.png
Test case
Introduce dependencies in the test project that are packaged in the above project:
<dependency>
<groupId>io.sofastack.cloud</groupId>
<artifactId>sofastack-cloud-core-kafka</artifactId>
</dependency>
Copy the code
- Create a new application.properties configuration file in the Resources directory
# = = = = = = = = = = = = = = kafka = = = = = = = = = = = = = = = = = = = # specified kafka agent address, can be multiple, 192.168.0.1 here is above kafka startup configuration file corresponding # note: Kafka can only be configured with host name, IP is not supported, no authentication, # if you have a problem with authentication, Try the machine under the binding host spring. Kafka. The bootstrap - the servers = 192.168.0.1:9092 # = = = = = = = = = = = = = = = the provider = = = = = = = = = = = = = = = = = = = = = = = Spring. Kafka. Producer. Retries = 0 # the number of each batch send messages spring. Kafka. Producer. The batch - size = 16384 Spring. Kafka. Producer. The buffer - memory = 33554432 # specified message key and decoding way of the message body spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer = = = = = = = = = = = = = = = = = = = = = = = # specify a default consumer group id spring. Kafka. Consumer. The group id = test - consumer - group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true Spring. Kafka. Consumer. Auto - commit - interval = 100 ms # specified message key and decoding way of the message body spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.application.name=kafka-test logging.path=./logsCopy the code
- Emulated sending a message in the launch class
@SpringBootApplication
@PropertySource("classpath:application-kafka.properties")
public class ProviderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(ProviderApplication.class, args);
// In normal use, you can use Autowired injection directly
KafkaSender bean = run.getBean(KafkaSender.class);
for (int i = 0; i < 3; i++) {
// Invoke the message sending method in the message sending class
bean.sendMessage(KafkaContants.TRADE_TOPIC, "send a test message");
try {
Thread.sleep(3000);
} catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code
- Write the consumer. In the SpringBoot project, the consumer implementation is very simple
@Component
public class KafkaReceiver {
// Set the listener to the same groupId as in the configuration file
@KafkaListener(topics = { KafkaContants.TRADE_TOPIC }, groupId = "test-consumer-group")
public void listen(ConsumerRecord
record) { Optional<? > kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println(message); }}}Copy the code
After starting the project, you can view the information printed by consumers on the console:
> sh kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic trading
Copy the code
After executing the above command, the command line will wait for input. Here input glmapper and SOFA successively:
reference
- Introduction
- Kafka official Documentation