Kafka learning Notes (1)

What is Kafka

Kafka was originally an internal project at LinkedIn and has now been donated to the Apache Foundation. Kafka is a high performance distributed publish/subscribe based messaging system. Kafka is a distributed, partitioned, redundant and persistent logging service.

Why use message queues

When factors such as the speed or stability of production and consumption are inconsistent in the system, message queues are needed as an abstraction layer to bridge the gap. A “message” is a unit of data transferred between two computers. Messages can be very simple, such as containing only text strings; It can also be more complex and may contain embedded objects. Messages are sent to queues, which are containers that hold messages during their transmission.

Benefits of using message queues

Improved response speed

You use a message queue, the producer side, you throw a message in the queue, you can immediately go back and respond to the user. There is no need to wait for processing results. The processing results can be taken by the user himself later, such as the hospital to take the test sheet. It is also possible to have producers subscribe (for example, to leave a mobile phone number or to implement a listener interface and join a listening queue) to be notified of a result. Get the convention to put the result somewhere without notice.

Lifting stability

Consider the case where an e-commerce system places an order and sends data to a production system. The network between the e-commerce system and the production system may go offline, and the production system may suspend service for maintenance reasons. If message queue is not used, the data of e-commerce system will be released, and customers cannot place orders, affecting business development. Two systems should not be so tightly coupled. Should be decoupled via message queues. At the same time, the system is more robust and stable.

Reduce service coupling

In traditional designs, an invocation of a service may involve multiple downstream invocations. This directly causes a service to be coupled to multiple downstream services, and multiple downstream calls will slow down the service. When a message queue is used, our service only cares about whether a message is sent successfully and does not care who consumes a message. Decoupled services are directly decoupled from each other and are only coupled to message queues.

A primer on Kafka

Install operating environment

Install the zookeeper

Since Kafka uses ZooKeeper as the Kafka cluster registry, we first need to deploy an instance of ZooKeeper on our own machine. Zookeeper using Java to write, so our deployment on the machine must have the Java runtime environment to zookeeper download address: zookeeper.apache.org/releases.ht… For example, in the single-machine deployment mode, decompress the downloaded package, go to the conf directory of ZooKeeper, and rename zoo_sample. CFG to zoo. CFG.

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.DataDir = / Users/vector/codeTools/zookeeper - 3.6.2 / data dataLogDir = / Users/vector/codeTools/zookeeper - 3.6.2 / dataLogs# the port at which the clients will connect
clientPort=2181
# start address and port
#Server. process ID= Running IP address: port that provides services: port that elects the leaderServer. 1 = 127.0.0.1:2181-3181Copy the code

Sh start to start the ZooKeeper service vector@VectordeMacBook-Pro bin %./ zkserver. sh start ZooKeeper JMX Enabled by default Using the config: / Users/vector/codeTools/zookeeper – 3.6.2 / bin /.. /conf/zoo.cfg Starting zookeeper … STARTED

Kafka run configuration

Download Kafka run file, and then extract the download address: www.apache.org/dyn/closer…. Go to the conf directory and edit the server.properties file (in single-machine deployment mode) :

############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################
# A comma separated list of directories under which to store log filesThe dirs = / Users/vector/codeTools/data/kafka_2. 13-2.8.0 / log/kafka
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181/kafka
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
Copy the code

To run Kafka in the background, enter the following command

bin/kafka-server-start.sh -daemon  config/server.properties
Copy the code

Run the demo

Let’s use the Java language as an example

Producer

package com.vector.kafka.demo.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Producer {

    public static final String BrokerList = "localhost:9092";

    public static final String Topic = "kafka-topic-demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", BrokerList);
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
            ProducerRecord<String, String> record = new ProducerRecord<>(Topic, "hello kafka!");
            producer.send(record);
        } catch(Exception e) { e.printStackTrace(); }}}Copy the code

Consumer

package com.vector.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {

    public static final String BrokerList = "localhost:9092";

    public static final String Topic = "kafka-topic-demo";

    public static final String GroupID = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", BrokerList);
        properties.put("group.id", GroupID);
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singletonList(Topic));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String, String> record : records) { System.out.println(record.value()); }}}}Copy the code

Run a screenshot

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http: / /www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello kafka!
hello kafka!

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
Copy the code

Thank you so much for watching, and I will continue to share some of the learning Kafka in the next installment. The content shared in this article is just an introductory level, and many configuration parameters are not described in detail. I will explain in detail later!

It has been a long time since I shared my learning, and I will gradually recover. Let’s briefly talk about the internship during this period, the internship experience is very good, as an intern is still in the stage of learning, do not have a lot of things on hand, mainly because of the high complexity of business, in fact, I still focus on learning business. The atmosphere in the group is also very good, it is really not about title, usually called by name, which is hard to see in other companies! Finally, a new round of graduation season is coming, I wish you all can get satisfactory offer, meet the right boss!