Part I: Getting to know Kafka

Apache Kafka is a distributed streaming platform. What the hell does that even mean? A streaming media platform has three key functions: publish and subscribe record streams, similar to message queues or enterprise messaging systems. Store record streams in a fault-tolerant persistent manner. Record the processing flow as it occurs.

Kafka is typically used for two broad applications:

1. Establish real-time streaming data pipelines to reliably obtain data between systems or applications

2. Build real-time streaming applications that transform or react to data streams

To understand how Kafka does this, let’s explore Kafka’s capabilities from the bottom up.

First, a few concepts:

Kafka runs as a cluster on one or more servers that span multiple data centers.

A Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp.

Kafka has four core apis:

The Producer API allows applications to publish streams of records to one or more Kafka topics.

The Consumer API allows applications to subscribe to one or more topics and process the flow of records generated for them.

The Streams API allows an application to act as a stream processor, consuming input Streams from one or more topics and generating output Streams into one or more output topics, effectively transforming input Streams into output Streams.

The Connector API allows you to set up and run reusable producers or consumers that connect Kafka themes to existing applications or data systems. For example, a connector for a relational database might capture every change to a table.

Topics and Logs

Let’s dive into the topic that the core abstraction Kafka provides for a record stream.

The topic is the category or feed name of the record publication. Topics in Kafka are always multi-user, that is, topics can have one or more users subscribing to the data.

For each topic, the Kafka cluster maintains a partition log that looks like this:

Each partition is an ordered, immutable sequence of records that is continuously attached to the structured commit log. Records in each partition are assigned a sequential ID number called an offset that uniquely identifies each record within the partition.

All published records persist in the Kafka cluster, regardless of whether a configurable retention cycle consumption has been used. For example, if the retention policy is set to two days, a record can be consumed for two days after publication, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size, so storing data for a long time is not a problem.

In fact, the only metadata retained on a per-consumer basis is the user’s offset or location in the log. This offset is controlled by the consumer: normally, the consumer will linearly offset its offset as it reads the record, but in fact, since that position is controlled by the consumer, it can consume the records in any order it likes. For example, a consumer can reset to an old offset to reprocess past data, or skip the most recent record and start consuming from “now.”

This combination of characteristics means that Kafka consumers are very cheap and can come and go without much impact on clusters or other consumers. For example, you can use our command-line tools to “tail” the content of any topic without changing the content consumed by any existing consumers.

Partitions in logs have many uses. First, they allow logs to grow beyond a size appropriate for a single server. Each individual partition must fit the server hosting it, but a topic can have many partitions, so it can handle any amount of data. Second, they’re more of a parallel unit at this point.

Distribution

Log partitions are distributed among servers in a Kafka cluster, and each server processes data and requests shared partitions. Each partition is replicated on multiple servers that can be configured for fault tolerance.

Each partition has one server acting as the “leader” and zero or more servers acting as the “followers.” The leader handles all read and write requests to the partition, while the follower passively copies the leader. If the leader fails, one of the followers automatically becomes the new leader. Each server acts as a leader for some partitions and a follower for others, so the load within the cluster is well balanced.

Geo-Replication

Kafka MirrorMaker provides geographic replication support for your cluster. With mirroring machines, messages are replicated across multiple data centers or cloud areas. You can use this backup and restore in active/passive scenarios; Or in activity/activity scenarios, bring data closer to the user, or support data location requirements.

Producers

Producers publish data to topics of their choice. The producer is responsible for choosing which partition of a record to assign to within a topic. This can be done in a circular fashion, just to balance the load, or it can be done according to some semantically partitioning function (for example, based on some key in the record).

Consumers

Consumers identify themselves with a consumer group name, and each record published to a topic is passed to a consumer instance in each subscriber consumer group. The consumer instance can be in a separate process or on a separate machine.

If all consumer instances have the same consumer group, the record will effectively load balance on the consumer instance.

If all consumer instances have different consumer groups, then each record will be broadcast to all consumer processes.

A two-server Kafka cluster hosts four partitions (P0 to P3) with two consumer populations. Consumer group A has two consumer instances and group B has four.

More often, however, we find that topics have a small number of consumer groups, each with a “logical user.” Each group consists of a number of extensibility and fault tolerance consumer instances. This is just publish-subscribe semantics, where subscribers are a set of consumers rather than a single process.

Consumption is achieved in Kafka by dividing the partitions in the log by consumer instances so that each instance is the only consumer of a “fair share” of the partitions at any point in time. Membership retention in this group is handled dynamically by the Kafka protocol. If new instances join the group, they will take over some partitions from other members of the group; If one instance dies, its partitions are distributed to the remaining instances.

Kafka only provides the overall order of records within a partition, not between different partitions within a topic. Each partition sort combined with the ability to partition the key data is sufficient for most applications. However, if you need a total ordering of records, you can do it with a topic with only one partition, but that would mean only one consumer process per consumer group.

Multi-tenancy technology

Kafka can be deployed as a multi-tenant solution. Multi-tenancy is achieved by configuring which topics can generate or consume data. There is also operational support for quotas. Administrators can define and enforce quotas on requests to control the proxy resources used by clients.

ensure

At a high level Kafka provides the following guarantees:

Messages sent by producers to a particular topic partition are added in the order they were sent. That is, if record M1 is sent by the same producer as record M2, M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.

A user instance views records in the order they are stored in the log.

For topics with replication factor N, we will tolerate up to n-1 server failures without losing any records submitted to the log.

Kafka ensures that messages within a partition are ordered, meaning that producers send messages in a certain order, brokers write them to partitions in the same order, and consumers read them in the same order

###### Kafka as a messaging systemCopy the code

How does Kafka’s flow concept compare to traditional enterprise messaging systems?

There have traditionally been two modes of messaging: queuing and publish-subscribe. In the queue, the consumer pool can be read from the server, and each record goes to one of them; In a publish subscription, records are broadcast to all consumers. Both models have their advantages and disadvantages. The advantage of queuing is that it allows you to divide the processing of the data across multiple consumer instances, which allows you to scale the processing. Unfortunately, once a process has read its data, the queue is no longer multi-user. Publish subscriptions allow you to broadcast data to multiple processes, but because each message flows to each subscriber, scaling is not possible.

Kafka’s concept of consumer groups encapsulates these two concepts. Like queues, consumer groups allow you to divide processing on a collection of processes (members of a consumer group). Like publishing subscriptions, Kafka allows you to broadcast messages to multiple user groups.

The advantage of the Kafka model is that each topic has these properties, it can be scaled, and it is also multi-user, with no need to select one or the other.

Kafka has stronger sorting guarantees than traditional messaging systems.

Traditional queues retain the order of records on the server, and if multiple users consume from the queue, the server distributes records in the order they are stored. However, although the server distributes records sequentially, records are delivered to consumers asynchronously, so they may arrive unordered among different consumers. This effectively means that the order of records is lost in the presence of parallel consumption. Messaging systems often work around this, with an “exclusive consumer” concept that allows only one process to consume from the queue, but this of course means that there is no parallelism in processing.

Kafka does better. By having the concept of parallel partitioning in a topic, Kafka can provide sorting assurance and load balancing in a pool of consumer processes. This is done by assigning partitions in a topic to consumers in a consumer group, so that each partition is completely consumed by one consumer in that group. By doing so, we ensure that the consumer is the only reader for the partition and consumes the data sequentially. With many partitions, this still balances the load on many consumer instances. Note, however, that consumer groups cannot have more consumer instances than partitions.

Kafka serves as a storage system

Any message queues that allow messages to be published and the decoupling between them effectively act as a storage system for flying messages. Kafka is different in that it is a very good storage system.

Data written to Kafka is written to disk and copied for fault tolerance. Kafka allows manufacturers to wait for confirmation that a write is considered complete until it has been copied completely, and guarantees that it will persist even if the server writes fail.

The disk structure Kafka uses the scale well Kafka will perform the same whether you have 50 KB or 50 TB of persistent data on the server.

As a result of taking storage seriously and allowing clients to control where they read, you can think of Kafka as a dedicated distributed file system for high-performance, low-latency commit log storage, replication, and propagation.

Kafka for stream processing

It is not enough to just read, write and store the data stream; the goal is to achieve real-time processing of the flow.

In Kafka, the stream processor takes a continuous stream of data from an input topic, performs some processing on that input, and produces a continuous stream of data to an output topic.

For example, a retail application might receive input streams for sales and shipments and output streams for reordering and price adjustments calculated from that data.

You can use the producer and consumer apis directly for simple processing. However, for more complex transformations, Kafka provides a fully integrated streaming API. This allows you to build non-trivial processing applications that can evaluate aggregations in a stream or join the stream together.

This tool helps solve the challenges faced by such applications: processing unordered data, reprocessing input, code changes, performing state calculations, and so on.

The stream API builds on the core primitives provided by Kafka: it uses the producer and consumer apis for input, uses Kafka for state storage, and uses the same group mechanism for fault tolerance in the stream processor instances.

Put the pieces together

This combination of messaging, storage, and streaming may seem unusual, but it is critical to Kafka’s role as a streaming platform.

Distributed file systems (such as HDFS) allow static files to be stored for batch processing. In effect, such a system allows the storage and processing of historical data from the past.

Traditional enterprise messaging systems allow the processing of future messages that arrive after a subscription. Applications built in this way process future data upon arrival.

Kafka combines these two capabilities, and the combination is crucial for Kafka as a streaming application platform and as a streaming data pipeline.

By combining storage and low-latency subscriptions, streaming applications can handle past and future data in the same way. That is, a single application can process historical, stored data, rather than end up when the last record is reached, and it can hold processing until future data arrives. This is the general concept of flow processing involving batch and message-driven applications.

Similarly, for streaming data pipelines, subscribing to live events makes it possible to use Kafka for very low-latency pipelining; However, the ability to store data reliably allows it to be used for critical data where delivery or consolidation of data must be guaranteed. N An offline system that periodically loads data or maintains data for a long time. The stream processing facility makes it possible to transform data upon arrival.

Part TWO: Advances in Kafka technology

Producer generalization

The main steps for Kafka to send a message start with creating a ProducerRecord object, which needs to contain the target topic and the content to send. We can also specify key partitioning so that when sending a ProducerRecord object, the producer first serializes the key and value object into a byte array so that they can be transmitted over the network. Next, the data is passed to the divider, and if a partition was previously specified in the ProducerRecord object, the divider does nothing but return the specified partition. If no partition is specified, the divider selects a partition based on the key of the ProducerRecord object. Once the partition is selected, the producer knows which topic and partition to send the message to. This record is then added to a record batch in which all messages are sent to the same topic and partition. A separate thread is responsible for sending these record batches to the appropriate broker. The server returns a response when it receives these messages. If the message is successfully written to Kafka, a RecordMetaData object is returned containing the subject and partition information, as well as the offset recorded in the partition. If the write fails, an error is returned. The producer tries to resend the message after receiving an error and returns an error message if it fails again after several attempts.

Create a Kafka producer

To write a message to Kafka, first create a producer object and set some properties. Kafka producers have three mandatory properties.

Bootstrap.servers key.serializer value. Serializer Other default properties: acks buffer.memory compression.type retries batch.size linger.ms client.id max.in.flight.requests.oer.connection The timeout. Ms, request. A timeout. Ms and metadata. The fetch. The timeout. Max. Ms block. Ms Max. Request. The size Resive. Buffer. Bytes and the send buffer. The bytes

The following code snippet demonstrates how to create a new producer, specifying only the required properties and using the default Settings

private Properties kafkaProps = new Properties(); 

kafkaProps.put("bootstrap.servers"."broker1:9092,broker2:9092");

kafkaProps.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);
Copy the code

The main point of the code snippet above is to create a New Properties object because we’re going to define keys and values as strings, so use the built-in StringSerializer Here we create a new producer object, set the appropriate types for the keys and values, and pass it the Properties object

With the producer object instantiated, the next step is to start sending messages. There are three main ways to send messages,

Fire-and-forget: We send a message to the server, but we don’t care if it arrives. Most of the time the message will arrive because Kafaka is highly available and the producer will automatically try to resend it, but sometimes some messages will be lost using this method and send() synchronously: We send the message using send(), which returns a Future object and calls get() to wait to see if the message was successfully sent. Asynchronously send(): We send the message using Send () and specify a callback function that the server calls when it returns a response

The examples above all use single threads, but actually producers can use multiple threads to send messages and they can start with a single consumer and a single thread. If higher throughput is required, the number of threads can be increased while the number of producers remains the same. If that’s not enough, you can increase the number of producers

Send a message to Kafka

The simplest way to send a message is as follows

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry":"Precision Products"."France");
try{
     proucer.send(record);
} catch (Exception e){
         e.printStackTrance();
}

Copy the code

The producer’s send() method takes the ProducerRecord object as an argument, so we create a ProducerRecord object first. ProducerRecord has several constructors, only one of which is used here. It needs the name of the target topic and the key and value object to send, which are all strings. The freshness of the key and value objects must match the producer object of the serializer

We use the producer’s send() method to send the ProducerRecord object. The send() method returns a Future object containing RecordMetadata, but we ignore the return value so we don’t know if the message was sent successfully. If we don’t care about the result, we can Use this sending mode. Such as logging Twitter messages, or less important application logs

The synchronous

The simplest way to send messages synchronously is as follows

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry"."Precision Products"."France");

try{
  producer.send(record).get();
} catch (Exception e){
       e.printStackTrance();
}
Copy the code

The producer.send() method returns a Future object and then calls the Future object’s get() method to wait for Kafka’s response. If the server returns an error, the get() method throws an exception. If there are no errors, we’ll get a RecordMetadata object that we can use to get the offset of the message if there are any errors before or during sending the data. For example, if the broker returns an exception that does not allow retransmission of messages or has exceeded the number of retransmissions, it will throw an exception. We simply print out the exception message

Asynchronous send

Assume that a message takes 10ms to and from the application to the Kafka cluster. If you wait for a response after each message is sent, it takes 1 second to send 100 messages, but if you just send messages without waiting for a response, it takes much less time to send 100 messages. Most of the time, we don’t need to wait for a response, although Kafka sends back the target subject, partition information, and offsets of the message, which are not required for the sending application. However, in the event of a message delivery failure, we need to throw an exception, log the error, or write the message to an error message file

In order to handle exceptions while sending messages asynchronously, producers provide callback support. Here is an example of how to use callbacks

private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata,Exception e){
if(e ! = null){ e.prinStackTrace(); } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry"."Biomedical Materials"."USA");

producer.send(record,new DemoProducerCallback());
Copy the code

In order to use the Callback, the need to implement org. Apach. Kafka. Clients. Producer. The Callback interface classes, this interface is only one onCompletion method If Kafka return an error onCompletion method will throw a non-empty (non The null exception passes in a callback object when a message is sent

Consumers (KafkaConsumer)

Before we look at how to read messages from Kafaka, let’s first look at the concept of consumers and consumer groups Suppose we have an application that reads messages from a Kafka topic, validates them, and then stores them. The application needs to create a consumer object, subscribe to the topic and start receiving messages, then validate the messages and save the results, after a while What if the producer lets the topic write messages faster than the application can validate the data? Just as multiple producers can write messages to the same topic, we need to use multiple consumers to read messages from the same topic, diverting the messages. Kafka consumers belong to consumer groups. Each consumer in a group subscribes to the same topic. Each consumer receives messages from a subset of the topic.

Consumer group and partition rebalancing

Consumers in the group read the topic partition together, and when a new consumer joins the group, he reads the message that was read by other consumers. When a consumer is shut down or crashes, he leaves the group, and partitions that were read by him are read by other consumers in the group. Partition reallocation occurs when the topic changes, such as when an administrator adds a new partition. The transfer of ownership of a partition from one consumer to another is called a rebalancing how does a rebalancing occur? Consumers maintain their affiliation with the group and their ownership of the partition by sending heartbeats to the group coordinator’s broker. As long as consumers can send heartbeats at regular intervals, they are considered active and still reading messages for the partition. Consumers send heartbeats when they poll for messages or commit offsets. If the consumer stops sending heartbeats long enough, the session expires, and the group coordinator thinks he is dead, a rebalancing is triggered.

Create Kafka consumers
{
   Properties props = new Properties();
   props.put("bootstrap.servers"."broker1:9092,broker2:9092");
   props.put("group.id"."CountryCounter");
   props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
   props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
}
Copy the code
Subscribe to the topic
consumer.subscribe(Collection.singletonList("customerCountries")); / / the subject name"customerCountries"
Copy the code
Consumer configuration

fetch.min.bytes fetch.max.wait.ms max.partition.fetch.bytes session.timeout.ms auto.offset.reset enable.auto.commit partition.assignment.strategy client.id max.poll.records receive.buffer.bytes/send.buffer.bytes

Here the basic content of Kafka has been introduced, if you want to in-depth understanding of these is far from enough here can recommend several books to everyone if you want to have a deep understanding of Kafka as a whole can read <

> required reading followed by <

> these two books read a few books on OK Finally you can also read <

> do not recommend reading of course the official website read English document is the best