Kafka

Distributed log system

I. Application scenarios

  1. Message queue – can only be used for system decoupling
  2. Collect user activity logs
  3. Summary of application monitoring data
  4. Big data streaming processing

Second, the advantages of

  1. High throughput: 100K messages per second in single machine
  2. High performance: thousands of clients in a single machine, ensuring zero downtime and zero data loss
  3. Time complexity O(1) message persistence
  4. Partition Messages are transmitted sequentially
  5. Offline & real time data processing
  6. On line horizontal extension

Three, characteristics

  1. Only message pull, no push
  2. Message retries (consumer side) and dead-letter queues are not supported – only by Redis itself
  3. Messages are managed using topics, with multiple partitions for a topic and multiple copies for a partition
  4. 1 partition to 1 consumer, and 1 consumer to N partitions

4. Basic concepts

1

(1) Messages are published from topics to partitions

  1. The specified
  2. Message Key Hash is modded
  3. Round robin

(2) Batch delivery conditions (any condition is met)

  1. Buffer size
  2. Message waiting time

(3) Broker metadata return method

  1. blocking
  2. The callback

(4) components

  1. Interceptor: Modifies the message content
  2. Serializer: Converts keys and values from objects to byte arrays
  3. Divider: Which partition to allocate messages to

2. Consumers

  1. Subscribe to one or more topics and read them sequentially
  2. The consumer group offset distinguishes between read messages
  3. Consumer groups ensure that a partition can only be used by one consumer to avoid repeated consumption
  4. When consumers fail, rebalancing is triggered and partitions are reallocated
  5. If the number of consumers exceeds the number of subject zones, some consumers are idle
  6. Adding consumers to a consumer group helps to horizontally expand spending power
  7. Consumer partition allocation policy
    1. RangeAssignor [default] : First calculate the number of partitions to be allocated to each consumer (try to divide evenly), and then cut them into segments
    2. RoundRobinAssignor: All (multiple) topic partitions, assigned sequentially
    3. StickyAssignor: Try to match the last assignment as much as possible

Broker an independent Kafka server

  1. One Broker is selected from the cluster as the cluster controller – implemented using temporary nodes of Zookeeper
    1. Assign partitions to brokers
    2. Monitoring the Broker

4. The Topic subject

Producer sends messages to Kafka, specifying which Topic the messages belong to

5. Partition Partition

  1. This helps horizontally expand storage capacity
  2. The Broker that processes the read/write partition (the leader partition) is called a partition leader
  3. The order of messages cannot be guaranteed within the scope of a topic, but only within the scope of a partition. – Strict guarantee: number of partitions 1

6. A copy of the Replicas

Create data redundancy to improve disaster recovery capability

  1. Leader Replica – Production and consumption requests pass through this Replica
  2. Follower Replica – does not process any requests, only for the leader Replica to crash and promote to a new leader
  3. In-sync Replicas (ISR) – A Replica that is partially synchronized with the Leader Replica
  4. OSR (out-of-sync Relipcas) : The number of replicas of the Leader Replica lags behind
  5. AR ( Assigned Repllicas ) = ISR + OSR

Leader copy down – The commit data of the original Leader is cleared and the new Leader poll data is generated

The replica election selects the Leader from the ISR, which is less redundant than the half mechanism

Unclean. Leader. Election. Enable = true: can be chosen from among the ISR leader

If the half-way mechanism is used to tolerate one downtime, three servers are required; if the half-way mechanism is used to tolerate two downtime, five servers are required. Two servers are required to tolerate one downtime using ISR, and three servers are required to tolerate two downtime.Copy the code

7. Offset Specifies the Offset

Kafka is recommended because Zookeeper is not suitable for high concurrency

1. Partition

  1. HW (High Watermak) : indicates the offset of all isrs that have been synchronized. Consumers can only pull messages before this offset at most
  2. LEO (Log End Offset) Log End: Indicates the Offset of the next message to be written

2. Consumer

Consumers need to submit their offsets to Kafka, stored in the __consumer_offsets theme (default 50 partitions), which Kafka only takes care of

  1. Automatic submission [default]

    When poll() is called, the offsets of all messages from the last poll are submitted

    Messages are not lost, but repeated consumption occurs

  2. Manual submission

    1. Sync – Affect TPS – Resolve: multiple polls are submitted again
    2. Asynchronous – Commit failures are not automatically retried

[Recommended] Synchronous and asynchronous

Five, the script

  1. Kafka-topics. Sh: Management topic

    #List all topics
    kafka-topics.sh --list --zookeeper localhost:2181/myKafka
    
    #Create a topic, 1 partition, 1 copy (only Leader, no followers), specify the maximum message size, and specify the LOG block size
    kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1 --config
    max.message.bytes=1048576 --config segment.bytes=10485760
    
    #Create the theme and specify the location of the partition, 3 partitions, Leader :Follower Broker:Follower Broker...Sh --zookeeper node1:2181/myKafka --create --topic tp_demo_03 -- replica-Assignment "0:1,1:0,0:1"
    #Viewing partition messages
    kafka-topics.sh --zookeeper localhost:2181/myKafka --list
    
    #View messages for the specified topic
    kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_1
    
    #Delete a specified topic
    kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_1
    
    #Modifies the topic message maximum size configuration
    kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576
    
    #Delete the topic message maximum size configuration
    kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01
    
    #Adding a Topic Partition
    kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2
    Copy the code
  2. Kafka-console-producer. sh: indicates a production message

    #Open producer
    kafka-console-producer.sh --topic topic_1 --broker-list localhost:9020
    Copy the code
  3. Kafka-console-consumer. sh: consumption message

    #Open consumer
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1
    
    #Unlock consumers and spend from scratch
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --from-beginning
    Copy the code
  4. Kafka-consumer-groups. sh: manages consumer offsets

    #View the consumption information of the specified Group
    kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group
    Copy the code
  5. Kafka-reassignpartitions. Sh: subject partitions are reassigned

    # topics-to-move.json{" switchable viewer ": [{" topic" : "tp_re_01" topic # to partition}], "version" : 1}
    
    #Read the Ramon-to-motion. json document to generate a run plan for Broker0 and Broker1kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --topics-to-move-json-file topics-to-move.json --broker-list "0, 1" - the generate
    # topics-to-execute.json
    {"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":
    [0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":
    [0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":
    [0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":
    [0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":
    [0],"log_dirs":["any"]}]}
    
    #Run the topics-to- execut. json partition plan
    kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --execute
    
    #Verify that the partition plan is running on topics- to-execut. json
    kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --verify
    Copy the code
    # increment-replication-factor.json{" version ": 1," partitions ": [{" topic" : "tp_re_02", "partition" : 0, "replicas" : [0, 1]}. {" topic ":" tp_re_02 ", "partition" : 1, "replicas" : [0, 1]}, {" topic ":" tp_re_02 ", "partition" : 2, "replicas" : [1, 0]}]}
    #Run the increase- ReplicationFact. json copy allocation plan
    kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file increase-replicationfactor.json --execute
    Copy the code
  6. Kafka-run-class. sh: Run utility classes to view logs and index documents

    #The log file 00000000000000000000.log is printed
    kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log | head
    
    #Print 00000000000000000000. The index index documents
    kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log | head
    Copy the code

Six, information reliability

1. Send

  1. Acks confirmation mode

    1. 0: does not wait for the Broker to confirm that the message is put into the buffer – retries fails

    2. 1: Only the message is stored in the Leader Replica. – If the Replica is not synchronized, the message is lost

    3. All [default] : all ISRS are confirmed and messages are not lost

  2. Retries: number of retries when an error message is sent

    1. Asynchronous message confirmation to ensure strict order within the partition: MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1

Rebalance

During rebalancing, Kafka will be unavailable for several minutes to hours, which should be avoided in production environments

1. Trigger conditions

  1. The member of the consumer group changed
  2. Change the number of theme partitions – Kafka only supports adding partitions
  3. Subscribe to topic changes – Consumers subscribe to topics using regular expressions

2. Avoid it

  1. Heartbeat timeout session.timout.ms-6s
  2. Heartbeats. Interval. Ms-2s
  3. Poll interval max-poll.interval. ms – The maximum duration is + 1 minute

3. The process

  1. JOIN: Requests all members of the consumer group to JOIN the consumer group and selects a consumer as the Leader
  2. SYNC: The Leader establishes a consumption plan and sends the plan to the Group Coordinator (the consumer Group Coordinator, the Broker where the Leader copy resides). The Group Coordinator sends the plan to the consumers

4. Assign policies

  1. RangeAssignor [default]
  2. RoundRobinAssignor
  3. StickyAssignor

Eight, logs,

1. The document

  1. Purpose: Save message content
  2. Each partition has a log file -.index,.timestamp,.log… – Consistent document names: LogSegment
  3. The log is appended to sequential writes
  4. LogSegment Purpose: Reduce the size of log files and locate logs quickly
  5. ActiveLogSegment ActiveLogSegment – can be written, others can only be read
  6. The document is called the first message offset
  7. content
    1. Offset: message offset
    2. Position: indicates the physical position of the message
    3. CreateTime: timestamp – Do not specify this manually when producing messages. Otherwise, messages may be out of order
    4. Magic: message type
    5. Compresscodec: Indicates the compression type
    6. CRC: indicates the CRC value after verification

2. Index the document content

  1. Records the mapping between the message offset and the physical address logged in the log document
  2. Is a sparse index and does not create an index for every log

3. Split logs

The trigger condition

  1. The size of a log file exceeds log.segment.bytes: 1 GB by default
  2. Index file size exceeds log.index.sie.max. Bytes: 10 MB by default
  3. Ms or log.roll.hours: 7 by default

Run the process

Start by reserving 1 gb of log.index.sie.max. Bytes (default: 1 GB) and cut it to the actual size when needed

Purpose: To reduce code logic

4. Remove

  1. Delete [default]
    1. Log.retention. Ms time: 7 days by default, judged according to the minimum timestamp in LogSement
    2. The segment size bytes
    3. The offset
  2. Compression – Same Key, different Value, save the last version – for real-time computing Dr

5. Hard disk storage

  1. Zero copy: reduce the conversion from kernel mode to user mode and discard unnecessary copy times
  2. Page cache: Writes disk data to memory to reduce disk IO
  3. Sequential writes: can be optimized using preread (reading disk blocks into memory) & post write (merging small write operations)

Nine, transaction

  1. Multiple messages make the Consumer visible or invisible simultaneously
  2. Application scenario: consume a Topic, do processing and then send to another Topic
  3. __transaction_state theme, which holds transaction state
  4. The Broker of the __transaction_state theme is the transaction coordinator, responsible for initiating, committing, and rolling back transactions
  5. Use epoch to ensure that a TransactionalID has only one active Producer

1. The configuration

consumers

  1. Disable auto.mit and no manual commitSync(), commitAsync()
  2. Set ISOLation. level to READ_COMMITTED or READ_UNCOMMITTED

producers

  1. Configuration transactional. Id
  2. Configure the enable idempotence

2. Stop

When sending BeginTransaction(TxId), the Producer times out or the response contains an exception. In this case, the BeginTransaction(TxId) will be tried again. Note the idempotency

10. Repeated messages

1. Producer stage

Cause: The correct Broker response was not received, causing the message to be resended

To solve

  1. Enable idempotency
    1. enable.idempotence = true
    2. ack = all
    3. retries > 1
  2. If ack = 0 is not retried, messages may be lost – for throughput Pointers more important than data loss – log collection

2. Consumer stage

Cause: The message was consumed and offset was not submitted to the Broker in time

To solve

The problem of repeated message sending can not be fundamentally solved, only idempotent processing can be done through the consumer side