1. The term

1.broker

Brokers are instances of Kafka. Each server can have one or more Instances of Kafka. Brokers in a Kafka cluster have different numbers

2. The topic with the message

Kafka organizes all messages into topics, which can be divided into partitions. Each partition consists of one message after another, and each message is identified with an increasing sequence number representing the order in which it came in. They are stored in the partition in sequence.

3.partition

Each topic is divided into one or more partitions. Partitioning an appropriate partition in advance facilitates subsequent Kafka cluster expansion and improves concurrent consumption capacity, and Kafka ensures the order of messages within partitions

4.Replication

Each partition has multiple copies, which act as a backup. If the primary partition (Leader) fails, a Follower is selected to become the Leader. The default maximum number of replicas in Kafka is 10, and the number of replicas cannot be greater than the number of brokers. Followers and leaders are definitely on different machines, and the same machine may only store one copy (including itself) on a partition.

4.producer

The following parameters are required for a producer to produce a message, specifying which topic to produce the message to, how to partition the message into different partitions based on the key, and what the message content is

5.consumer

Each consumer marks itself as a consumer group. Then the system groups consumers into groups and sends the message to all groups. Only one consumer in each group can consume the message.

There are two consumer consumption models in the general message system. Push: The advantage lies in the high real-time performance of messages, but the consumption ability and saturation of consumers are not taken into account, and the producer is likely to overwhelm consumers. Pull: The advantage is that the consumption speed can be controlled to ensure that consumers will not be oversaturated. However, empty rotation will occur when there is no data, consuming CPU

6. Message sending semantics

  1. Producer perspective

    1. A message can be sent at most once. Producer asynchronously sends messages or synchronously sends messages but the retry times is 0
    2. The message is sent at least once. The producer sends the message synchronously and tries again if the message fails or times out
    3. Send a message and only send it once
  2. Consumer point of view

    1. The message can be consumed at most once, and the consumer reads the message, then confirms the position, and finally processes the message
    2. The message is consumed at least once, and the consumer reads the message, processes the message, and finally confirms position
    3. Messages are consumed and consumed once

7. Kafka features

1. Availability

In Kafka, normally all nodes are in sync. When a node is out of sync, it means that the entire system is faulty and needs to be fault-tolerant. Synchronization indicates that: The node can communicate with ZooKeeper. If the node is a follower, the difference between the consumer position and the leader cannot be too large. The synchronized nodes In a partition form a set, which is the in-sync Replicas (ISR) of the partition.

Kafka is fault-tolerant in two ways: The number of copies can be set. When the number of copies is N, it represents a leader and N-1 followers. Followers can be regarded as the leader’s consumer, who pulls the leader’s messages and appends them to their own system. Failover: When the leader is out of sync, the system selects a new leader from the ISR. When a follower is out of sync, the leader removes the follower from the ISR. When the follower recovers and completes data synchronization, it enters the ISR again. When a producer produces a message, the message is successfully submitted only when the message is confirmed by all ISRs. Only the message successfully submitted can be consumed by consumers.

Dirty leader election. If N replicas are all suspended and nodes are restored, the partition service becomes unavailable because there are no nodes in the ISR. Kafka uses a demotion measure, electing the first restored node to serve as the leader, based on its data

2. The consistency

The above schemes ensure data availability. Sometimes, high availability is at the expense of consistency. To achieve consistency, you can take the following measures:

Disable dirty Leader election

Set the minimum number of ISRs min_ISr

3. Persistence

Kafka relies on disk rather than memory.

Sequential read, data structure select queue, operation only according to offset read and append, based on queue time complexity is only O(1),

2.Kafka-Producer

Reference blog.csdn.net/qq_28410283…

  1. parameter

    1. Bootstrap. servers, used to find the Kafka cluster

      This parameter specifies a set of host:port pairs that are used to create connections to the Kafka Broker server, such as KL :9092, K2:9092,k3:9092.

      If a Kafka cluster has a large number of machines, you only need to specify some brokers rather than listing all machines. Because producer uses this parameter to find Wells and discover all brokers in the cluster, no matter how many machines are specified. Multiple machines are specified for this parameter only for failover use. If a broker fails, producer can still connect to the Kafka cluster through other brokers specified by this parameter after being restarted.

    2. Serializer, value. Serialize any message sent to the broker must be in the format of a byte array. Therefore, each component of the message must be serialized before being sent to the broker. This parameter is used to serialize the key of the message. This parameter specifies the implementation org.apache.kafka.com mon. Serialization. The Serializer interface of the fully qualified name of the class.

    3. Acks to ensure message persistence

      The acks parameter controls the durability of producer production messages. Kafka, for producer, cares about the durability of “submitted” messages. Once a message is successfully committed, the message is considered “unmissable” as long as any copy that holds the message is “alive.” Acks specifies the number of copies of the message that the leader broker must ensure have been successfully written before sending a response to the producer. Currently, there are three acks values: 0, 1, and all.

    4. Buffer. memory, cache data to be sent

      This parameter specifies the size of the buffer used by the producer side to cache messages, in bytes. The default value is 33554432, or 32MB. Since the Java version producer is designed to send messages asynchronously, the Java version producer starts by creating a buffer in memory to hold messages to be sent, and then another dedicated thread reads the messages from the buffer to actually send them. The size of this memory is specified by the buffer.memory parameter. If producer writes messages to the buffer faster than the dedicated I/0 thread can send messages, the buffer space must grow. The producer stops working and waits for the I/0 thread to catch up. If the I/0 thread fails to catch up after a certain period of time, the producer throws an exception. If the producer program is sending messages to many partitions, this parameter needs to be set carefully to prevent too small a memory buffer from reducing the overall throughput of the producer program.

    5. Compression. Type, how to compress data

      Set whether the Producer side compresses messages. The default value is None, that is, messages are not compressed. The introduction of compression on Kafka’s producer can significantly reduce the network I/O transmission overhead and thus improve overall throughput, but also increase the CPU overhead on the producer machine. In addition, if the compression parameters on the broker side are set differently from the producer side, the broker side will use additional CPU resources to decompress and recompress the message while writing it. Kafka currently supports three compression algorithms: GZIP, Snappy, and LZ4. According to the actual use experience, the performance of producer combined with LZ4 is the best. LZ4 > Snappy > GZIP;

    6. Retries: indicates the number of retries

      Kafka Broker may fail to send a message due to a transient failure (such as a transient leader election or network jitter) while processing write requests. Such failures are usually self-healing, and if they are returned to the producer in an exception wrapped in a callback function, there’s not much the producer can do except simply try sending the message again in the callback function. Instead, the producer should automatically implement retries internally. So the Java version producer automatically retries internally, provided that the retries parameter is set.

    7. Batch. size indicates the batch send size

      A producer encapsulates multiple messages sent to the same partition into a batch. When the batch is full, the producer sends all the messages in the batch. However, the producer does not always wait for the batch to be full before sending a message. It is possible that the producer will send the batch when there is plenty of free space left. Obviously, the size of batch is very important. Generally speaking, a small batch contains only a small number of messages, so only a small number of messages can be written in a single sending request. Therefore, the throughput of producer is very low. A batch that is very large puts a lot of pressure on memory usage because producer allocates a fixed amount of memory to the batch regardless of whether it can be filled. Therefore, the setting of batch.size parameter is a tradeoff between time and space. The default value of batch.size is 16384, or 16KB. This is actually a very conservative number. Increasing this parameter properly during actual use usually results in an increase in the throughput of producer.

    8. When batch sending is met with either Batch. size or Linger. ms, the producer starts sending messages.

  2. Custom partition selector

3.Kafka-Consumer

Reference blog.csdn.net/qq_28410283…

  1. parameter

    1. Bootstrap. servers is used to find the Kafka cluster

      The same as the Java version producer

    2. Group. Id group to manage consumers

      This parameter specifies the name of the consumer group, within which messages are consumed only once

    3. Deserializer, value. Deserializer deserializes data

      Deserialize data

    4. Session.timeout. ms The time required for the coordinator to detect member crashes

      The Consumer Group checks how long it takes for a group member to send a crash. If you set this parameter to 5 minutes, then when a group member suddenly crashes (such as by killing -9 or rock machine), the Kafka component that manages the group (the consumer group coordinator), Group coordinator) may take five minutes to sense the crash. Obviously, we want to shorten this time so that a coordinator can more quickly detect the maximum time a consumer fails to process the consumer message logic. If the interval between two consumer polls exceeds the stop value set by this parameter, A coordinator may decide that the consumer is no longer able to keep up with the progress of other members of the group, and thus “kick” the consumer instance out of the group and assign the consumer’s partition to another consumer

      Both situations lead to rebalance, and the second is unnecessary.

      In versions 0.10.1.0 and later, the session.timeout.ms parameter is specified as the time when coordinator detection fails.

    5. Max.poll.interval. ms Maximum time for message processing

      Session.neout. ms is stripped of the meaning of “maximum time for consumer processing logic”, and max-poll.interval. ms is responsible for this

    6. Auto-.offset. Reset offset Indicates the bottom-saving policy when the offset is invalid

      Specifies Kafka’s policy when there is no shift information or when the shift is out of bounds (that is, the shift of the message to be consumed by the consumer is not within a reasonable range of the current message log). Note in particular that there is no displacement information or displacement out of bounds, and this parameter is effective only if either of these conditions is satisfied.

      • -Raad: Make money from the earliest move. Notice that the earliest displacement here doesn’t have to be 0.
      • Latest: Specifies the start of consumption from the latest move.
      • None: Specifies that an exception will be thrown if no displacement information is found or if the displacement is out of bounds. I have rarely seen this parameter set to None in practice, so this value is rarely used in real business scenarios.
    7. Enable.auto.com MIT Automatic commit shift

      This parameter specifies whether the consumer automatically submits the shift. If set to true, the consumer automatically commits the shift in the background. Otherwise, the user needs to manually submit the displacement.

    8. fetch.max.bytes

      Maximum number of bytes of a pull message

    9. max.poll.records

      Maximum number of messages pulled at one time

    10. Heartbeat.interval. ms helps consumer Group members quickly sense what’s happening on other nodes and start rebalance as soon as possible

      The key here is to figure out how the rest of the Consumer Group knows to switch on a new round of rebalance. When a coordinator decides to start a new round of rebalance, it inserts the decision into the response of a consumer’s heartbeat request in the form of a REBALANCE_IN_PROGRESS exception. So that other members can get response and know that it needs to rejoin the group. Obviously the faster the better, and heartbeat.interval.ms is designed to do just that.

    11. Connections.max.idle. ms Indicates the idle time of the connection

      Maximum idle time for a connection. If a socket connection exceeds the idle time, the socket will be closed. If you do not mind the socket overhead, you can set this parameter to -1 and never close the connection

  2. @ KafkaListener principle

    1. Inheritance KafkaListenerAnnotationBeanPostProcessor responsible for processing the annotations, as follows

      / / call before and after the post processor in bean initialization, / / postProcessAfterInitialization KafkaListenerAnnotationBeanPostProcessor implementation, Will use the annotation method to register / / KafkaListenerEndpointRegistrar registrar in public interface BeanPostProcessor {@ Nullable default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Nullable default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; }} / / in afterSingletonsInstantiated KafkaListenerAnnotationBeanPostProcessor implementation, Initialize's MessageListenerContainer public interface SmartInitializingSingleton {void afterSingletonsInstantiated (); } public class KafkaListenerAnnotationBeanPostProcessor<K, V> implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton{ }Copy the code
    2. Lifecycle is indirectly inherited by MessageListenerContainer, the doStart() method is called after initialization, and the ListenerConsumer loop calls the poll method of Consumer to get the message, The method annotated with the Listener annotation is then invoked to process the business logic

      public class KafkaMessageListenerContainer<K, V> // NOSONAR line count extends AbstractMessageListenerContainer<K, V> { @Override protected void doStart() { if (isRunning()) { return; } if (this.clientIdSuffix == null) { // stand-alone container checkTopics(); } ContainerProperties containerProperties = getContainerProperties(); checkAckMode(containerProperties); Object messageListener = containerProperties.getMessageListener(); if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } GenericMessageListener<? > listener = (GenericMessageListener<? >) messageListener; ListenerType listenerType = determineListenerType(listener); this.listenerConsumer = new ListenerConsumer(listener, listenerType); setRunning(true); this.startLatch = new CountDownLatch(1); this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); try { if (! this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) { this.logger.error("Consumer thread failed to start - does the configured task executor " + "have enough threads to support all containers and concurrency?" ); publishConsumerFailedToStart(); } } catch (@SuppressWarnings(UNUSED) InterruptedException e) { Thread.currentThread().interrupt(); }}}Copy the code

4.Kafka server principle parameter analysis

Reference blog.csdn.net/Dream_Flyin…

1. Parameter analysis

  1. broker.id

    Kafka uses a unique integer to identify the broker. The default value is -1. If not specified, Kafka automatically generates a unique value.

  2. log.dirs

    A very important parameter, kafka persistent message directory. This parameter can be set to multiple directories, separated by commas, so that Kafka evenly distributes the load across multiple directories. If each directory is on a different disk, it also improves overall write message throughput. The default is/TMP /kafka-logs.

  3. zookeeper.connect

    This parameter has no default value and must be specified. It can be a CSV list. If a set of ZooKeeper is used to manage multiple Kafka clusters, the Chroot of ZooKeeper must be specified.

  4. Listeners, advertised. Listeners

    Listeners is a list of the broker to monitor the CSV format is [agreement] : / / [hostname] : [port], [agreement] : / / [hostname] : [port]. This parameter is used by the client to connect to the broker. If this parameter is not specified, the nic is bound by default. If the host name is 0.0.0.0, all nics are bound. Kafka currently supports PLAINTEXT, SSL, and SASL_SSL. Advertised. similar to the listeners, this parameter is used to advertise listeners to clients. However, this parameter is mainly used in IaaS environments. For example, a cloud machine is equipped with multiple network adapters (private network adapters and public network adapters). You can set this parameter to bind a public IP address to external clients. Note Only listeners are needed to deploy the Kafka cluster on the Intranet, and advertised.listeners are needed to differentiate the Intranet and extranet.

  5. unclean.leader.election.enable

    Whether to enable the unclean leader election.

  6. delete.topic.enable

    Whether kafka is allowed to delete topics. By default, Kafka clusters allow users to delete topics and their data.

  7. log.retention.{hours|minutes|ms}

    This set of parameters controls the persistence of message data. If you set the parameters at the same time, select MS first, minutes next, and hours last. Kafka determines the last modified time of a log file.

  8. log.retention.bytes

    The parameter above is the time dimension, and this parameter is the space dimension. It controls how much data the Kafka cluster needs to store for each message log. For partition logs that exceed this parameter, Kafka automatically clears expired log segment files for the partition. The default value of this parameter is -1, which means that Kafka never deletes logs based on the total size of message log files.

  9. min.insync.replicas

    This parameter is used together with the acks parameter of the product. It specifies the minimum number of copies that the broker must successfully respond to a client message. If the broker segment is not satisfied, the client’s message is not considered successful. It works with the product acks configuration to use the highest level of message persistence that kafka clusters can achieve.

  10. num.network.threads

    Controls the number of threads that broker processes requests from the network in the background. The default is 3. It mainly processes network I/O and reads and writes data in the buffer. There is almost no I/O waiting. You are advised to increase the number of threads to 1.

  11. num.io.threads

    This parameter controls the number of threads that the broker actually processes network requests. The default is 8. Kafka creates eight threads that poll for incoming network requests and process them in real time. Num.io. Threads performs disk I/O operations. The value must be larger because SOME I/O waits may occur at peak times. The number of threads is twice the number of CPU cores. The maximum number of threads is three times

  12. message.max.bytes

    The maximum number of messages that Kafka broker can accept. The default value is 977KB.

  13. log.flush.interval.messages=10000

    Every time producer writes 10,000 messages, the data is flushed to disk

  14. log.flush.interval.ms=1000

    At 1 second intervals, flush data to disk

  15. Socket, the send buffer bytes, socket.. The receive buffer. The bytes, socket. Request. Max. Bytes

    The send buffer of the socket; Socket receive buffer; Bytes must be smaller than serverOOM, in case of socket.request.max. Bytes

2. Rebalance analysis