What is a kafka
Apache Kafka® is a distributed stream processing platform
The above is the introduction of the official website. Compared with the general message processing system, the differences are as follows:
- Kafka is a distributed system that is easy to scale out
- It provides high throughput for both publish and subscribe
- It supports multiple subscribers and automatically balances consumers when it fails
- Persistence of messages
Comparison with other messaging systems:
Contrast indicators | kafka | activemq | rabbitmq | rocketmq |
---|---|---|---|---|
background | Kafka is a high-performance, distributed messaging system developed by LinkedIn for log collection, streaming data processing, online and offline messaging | ActiveMQ is an open source, message-oriented (MOM) middleware that implements THE JMS1.1 specification, providing efficient, scalable, stable and secure enterprise-level message communication for applications. | RabbitMQ is an open source implementation of the Advanced Message Queue (AMQP) protocol developed by Erlang. | RocketMQ is an open-source distributed messaging middleware developed by Alibaba in 2012. It has been donated to the Apache Foundation and became an Apache Incubator project in November 2016 |
Development of language | Java and Scala | Java | Erlang | Java |
Protocol support | A set of their own implementation | The JMS protocol | AMQP | JMS, MQTT |
persistence | support | support | support | support |
Producer fault-tolerant | Acks =0 the producer does not wait for any response from the server before successfully writing the message. Acks =1 as long as the cluster leader receives the message, The producer receives a successful response from the server. Acks =all The producer receives a successful response from the server only when all participating nodes have received the message. This mode is the most secure | Retry after the sending fails | There are ACK models. The ACK model may duplicate messages, and the transaction model guarantees complete consistency | Like kafka |
throughput | Kafka has high throughput, internal use of message batch processing, zero-copy mechanism, data storage and acquisition is the local disk sequential batch operation, with O(1) complexity, message processing efficiency is very high | RabbitMQ is not as good as Kafka in terms of throughput. They start from a different point of view. RabbitMQ supports reliable delivery of messages, transactions, not batch operations. Based on storage reliability requirements, storage can be memory or hard disk. | Kafka performs better than rocketMq for a small number of topics, and rocketMq performs better than Kafka for a large number of topics | |
Load balancing | Kafka uses ZooKeeper to manage brokers and consumers in a cluster. You can register topics on ZooKeeper. Through the coordination mechanism of ZooKeeper, producer saves broker information of corresponding topics, which can be sent to the broker randomly or in polling. In addition, producer can specify shards based on semantics, and messages are sent to a shard of the broker | RabbitMQ load balancing requires a separate loadbalancer | NamerServer performs load balancing |
Architecture diagram:
Example:
Producer:
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
@Override
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + "," + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
* non-null.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if(metadata ! = null) { System.out.println("message(" + key + "," + message + ") sent to partition(" + metadata.partition() +
")," +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else{ exception.printStackTrace(); }}}}Copy the code
consumer:
public class Consumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
public Consumer(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
@Override
public void run() {
while (true) {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1).getSeconds());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + "," + record.value() + ") at offset "+ record.offset()); }}}}Copy the code
properties:
public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
public static final int CONNECTION_TIMEOUT = 100000;
public static final String TOPIC2 = "topic2";
public static final String TOPIC3 = "topic3";
public static final String CLIENT_ID = "SimpleConsumerDemoClient";
private KafkaProperties() {}}Copy the code
Related nouns:
- Producer: A message Producer, a client that sends messages to the Broker
- Consumer: Message Consumer, the client that reads messages from the Broker, Consumer <= number of partitions for the message
- Broker: Message-oriented middleware processes nodes. A Kafka node is a broker. One or more brokers can form a Kafka cluster
- Topic: Kafka categorizes messages by topic. For each message published to the Kafka cluster, a topic needs to be specified
- Partition: a topic can be divided into multiple partitions. Each Partition is internally ordered. By default, Kafka determines which Partition a message is sent to based on the key%partithon
- ConsumerGroup: Each Consumer belongs to a specific ConsumerGroup. A message can be sent to multiple Consumer groups, but only one Consumer in a ConsumerGroup can consume the message
Topic and Partition
- Messages from a Topic will be sent to a partition according to the specified rule (default is the hash value of the key % number of partitions, of course you can customize).
- Each partition is a sequential, immutable message queue that can be continuously added. Messages in partitions are assigned a sequence number called offset, which is unique in each partition
- The metadata held by the consumer is the offset, which is the consumer’s position in the log. The offset is controlled by the consumer: normally the offset increases linearly as the consumer consumes the message
Consumer and Partition
- Generally speaking, the message model can be divided into two types, queue and publish-subscribe. Queues are handled by a group of consumers pulling data from one end of the queue, which is consumed and gone. In the publish-subscribe model, a message is broadcast to all consumers, and the receiving consumer can process the message. This is abstracted from the Kafka model: Consumer Groups
- Consumer groups: Each group has several consumers. If all consumers are in a group, this becomes a queue model. If laughing consumers are in different groups, this becomes a publish-subscribe model
- Data in a partition will only be processed by consumers in one group, other consumers in the same group will not be processed repeatedly
- The number of consumers in the consumer group <= the number of partitions. If it is greater than the number of partitions, the extra consumers will be in the state of receiving no messages, resulting in unnecessary waste.
The last
Welcome to follow my public account and share your thoughts on programming, investment and life from time to time 🙂