Blog.csdn.net/beitiandiju…
Kafka.apache.org/documentati… \
Description:
Some proper nouns in the original are not translated:
kafka
topic
partition
consumer
producer
server
client
high-level
\
\
1, start
1.1 Introduction Kafka provides distributed, partitioned, and backup log submission services. It is also a messaging system with a unique design. So what does that mean? First, let’s review the basic terms of messaging: \
-Kafka maintains feed of messages categorized by topics
– Producers publish messages
– Consumer subscribes to Topics and processes the feed under Topics
– Run in a cluster and can consist of one or more servers called brokers
Therefore, at a high level, producer sends messages to the Kafka cluster over the network, and the Kafka cluster can provide these messages to consumers, as shown in the figure below:
The communication between clients and Servers is accomplished using the simple, high-performance, language-independent TCP protocol. Java clients are not only officially available, but also in many other languages. \
Switchable viewer and Logs
Let’s take a closer look at Kafka’s high-level abstractions – topics can be thought of as different categories of message flows. Different information bases are classified or summarized by different topics, and then the producer sends messages of different categories to different topics. For each topic, the Kafka cluster maintains a partition log: as shown in figure: \
\
As you can see in the figure above, the message sequence in each partition is ordered and cannot be changed once written. It can only be appended continuously at the end of the partition. Different messages in the same partition are identified by a continuous number, called offset, which represents the order in which messages are written.
\
The Kafka cluster can save all published messages – whether or not messages consumed—- the saving time is configurable. For example, if the log retention time is set to two days, the log is available for consumption for two days from the time the log is written to Kafka, after which the message is deleted to free up space. As a result, Kafka can store large amounts of data efficiently and consistently.
In fact, each consumer needs to save only one piece of metadata, called “offset,” which is mainly used to record the current consume location in the log. Offset is maintained by the consumer, not the Kafka cluster: Normally, the offset increases linearly as the consumer reads the message, as if the offset only passively follows the consumer, but in reality, the offset is completely controlled by the consumer and the consumer can consum the message from any position it likes. For example, the consumer can reset offset to the previous value and re-consum the data.
\
These characteristics together explain: Kafka consumers consume less resources —- Consumers can be easily read repeatedly without affecting the cluster and other consumers. For example, you can use the command line tools provided by Kafka to continuously read the latest messages on any topic without affecting the consumption behavior of any other consumers.
\
Partitioning the logs into different partitions serves the following purposes: First, the storage capacity of multiple partitions is much greater than that of a single server, but the storage capacity of each partition is the storage capacity of the server where it is located, that is, the data of the same partition of the same topic can only be stored on the same server. This means that data from the same partition within the same topic cannot be stored on two servers at the same time, but the same topic can contain many partitions. Theoretically, you can increase the number of partitions by increasing the number of servers. Second, the partitions mechanism improves the parallel processing capability, but not at all (multiple consumers can consume different partitions, or different consumers can consume the same partition, Offset is controlled by the consumer.
Distribution – distributed
\
Log partitions are distributed among servers in a Kafka cluster, and each server can process data and share requests for partitions. Each partition can be backed up, and the number of backups can be configured to improve fault tolerance.
\
Each partitions has a “leader” server and zero or more “Followers” servers; The Leader processes all read and write requests to the partition, while the follower only backs up the Leader. If the leader fails, one of the “followers” will automatically become the new leader. Multiple partitions are stored on each server, which acts as both the “leader” of some of them and the “follower” of the rest, thus balancing cluster load.
\
Producers – Producers
\
Producers choose to publish data to some topics. Producer is responsible for distributing a message to a partition under the topic name. For load balancing purposes, messages can be distributed either in a looping manner or by performing distribution functions based on certain keywords in the message. This is not just using partitoning in seconds or less.
\
Consumers, Consumers
\
Traditional message system adopts two modes: queuing and publish-subscribe. In the queue pattern, many consumers may read messages from the same server, and each message is consumed by a consumer. In a publish-subscribe model, each message can be consumed by all consumers. Kafka provides a separate consumer abstraction that shares the characteristics of both patterns —- Consumer Group schema.
\
Queue mode:
\
\
Publisher-subscriber model:
\
\
\
Kafka Consumer Group mode:
\
\
Consumers identify themselves with the consumer group name, and each message from each topic is sent to all consumer groups, but only to one consumer instance of each consumer group. These consumer instances can be distributed in different processes or on different machines.
\
If all consumer instances belong to the same consumer group, this pattern is the traditional message queue pattern, with load balancing between all consumers.
\
If all consumer instances belong to different consumer groups, the pattern is publish-subscribe, and all messages are broadcast to all consumers.
\
Generally speaking, topics are consumed by a small number of consumers groups, each of which is a topic logical consumer. Each consumer group is composed of many consumer instances, which have advantages in extensibility and fault tolerance. Kafka differs from the publish-subscribe message queue pattern only in that Subscribers to Kafka are clusters of consumers rather than individual processes.
\
Kafka has a stronger guarantee of message order than traditional messaging systems.
\
The traditional queue pattern stores messages sequentially on the server, and if multiple consumers consum from the queue, the server sends messages in the order in which they are stored. However, even if the server sends messages sequentially, since the messages are sent asynchronously to consumers, the messages may not reach consumers in the order in which they are stored (for example, the messages on the server are stored in the order M1, M2, M3, M4, and consumers are C1, C2, C3, C4, If the message is sent asynchronously, M1 is sent to C2, M2 is sent to C4, M3 is sent to C1, and M4 is sent to C3, then the time order of the message to the consumer may be M4, M2, M1, M3. As a result, the order of the message processed by the consumer is inconsistent with the order in which the message is stored, thus interrupting the original message order. This means that message order is lost in parallel consumption. Messaging systems typically work around the principle of consumer uniqueness, that is, only one consumer is allowed on a queue, but this means the loss of parallel processing.
\
Kafka does this very well. Kafka provides parallel processing — partition– and within topics, Kafka can either guarantee message order or provide load balancing through a pool of consumer processes. This is done by assigning OPICS partitions to different consumers in the consumer group, so that each partition can be consumed by a consumer in the consumer group. The above allocation guarantees that a consumer will be a certain consumer of a partition, so that consumers will consume in the order in which the data is stored. The presence of multiple partitions enables load balancing between many consumer instances. Note that there cannot be more consumer instances in the same consumer group than partitions.
\
Kafka can only guarantee that messages within the same partition are generally ordered, but it cannot guarantee that messages between partitions under the same topic name are generally ordered. For most applications, distributing data to different partitions by key value is a useful way to keep messages in order for each partition. However, if all messages are required to be ordered, the topic is required to have only one partition, which means that there can only be one consumer process per consumer group.
\
\
Guarantees to ensure
\
At a high level, Kafka offers the following guarantees:
\
– Messages sent to a particular topic partition will be appended in the order they were sent. For example, if M1 and M2 are sent by the same producer, and M1 is sent earlier, M1’s offset in the partition is smaller than M2, which means M1 appears earlier in the message log.
\
-consumer The order in which the instance consumes messages is the same as the order in which messages are stored in Kafka.
\
– If replication-factor of topic is set to N, that is, if the backup server is set to N, the fault tolerance ensures that no submitted log messages are lost even if all n-1 backup servers fail.
\
More details about guarantees are given in the design section of the document.
\
\
1.2 the Use Cases
\
Here are some examples of widespread use of Apache Kafa. For an overview of these aspects, see this Blog Post.
\
Messaging
\
Kafka can replace a more traditional message broker. There are many reasons to use message brokers: to decouple data generation and data consumption modules, to cache unprocessed messages, and so on. Kafka has significant advantages over most messaging systems: better throughput, built-in partitioning, backup, and fault tolerance make kafka a better solution for large-scale messaging applications.
\
Historically, messaging applications tend to have low throughput, but also require low end-to-end latency and strong persistence, which Kafka can meet.
\
In this area Kafka is as good as traditional messaging systems like ActiveMQ or RabbitMQ.
Website Activity Tracking
The initial kakfa application example reconstructs the user behavior tracking pipeline to enable a series of real-time publish-subscribe streams. This means that site activities (page views, searches, or other actions taken by users) are posted to the central Topics name (one topic for each type of activity). These feed consumption can be used for real-time processing, real-time monitoring, and loading data into Hadoop or offline data warehouse systems for offline processing or presentation.
\
A large number of users viewing a page will result in a large number of messages viewing the page.
\
Metrics
\
Kafka is typically used to monitor operation type data. This involves aggregating statistics from distributed applications to produce aggregate information flows (feeds) of operation-type data.
\
Log Aggregation
\
Many people use Kafka as a log aggregation solution. Log aggregation typically collects offline log files and then centralizes these files (possibly similar to HDFS file servers) for processing. Kafka takes away the file details and instead presents them in a clearer abstraction, presenting the behavior log or event data as a stream of information. This enables lower latency in the process, while making it easier to support multiple data sources and distributed data consumption. Compared to a centralized log collection system like Scribe or Flume, Kafka provides the same functionality with greater persistence guarantees from a backup mechanism and lower end-to-end latency.
Stream Processing
\
Many users use Kafka as a message conduit between multiple levels of data processing: the original data is stored in different Kafka Topics and then aggregated, enhanced, or otherwise transformed into new Kafka Topics for later consumption. For example, the process of news recommendation is as follows: first get the article content from the RSS feed, then import topic named “articles”; Second, subsequent processing might normalize or simplify the content, and then import the content into a new topic. The final processing might be to try to recommend the content to the user. This process actually shows a flow chart of real-time traffic flowing between individual topics. Since 0.10.0.0, Apache Kafka has released a stream processing library called Kafka Streams, which has the advantage of being lightweight and performing well. It can perform the multilevel processing described above. In addition to Kafka Streams, there are several open source streaming tools available, including Apache Storm and Samza.
\
Event Sourcing
\
The Event Sourcing application design pattern is that state changes are recorded in a series of logs in chronological order. Kafka’s support for large-scale data storage makes it an effective way to handle event collection in the background.
\
Commit Log
\
Kafka can be used for distributed systems to perform log submission functions. Kafka improves backup reliability by backing up data between nodes. It also uses a resynchronization mechanism to ensure that failed nodes can restore the data that should have been stored. A log compaction mechanism helps with this application; In this application, Kafka is similar to the Apache BookKeeper project.
\
\
\
1.3 Quick Start
This tutorial assumes that you are new to Kafka or have not yet installed Kafka or ZooKeeper.
\
Download the code
\
Download the 0.10.0.0 release and unzip it:
\
> tar -xzf kafka_2.11-0.10.0.0tgz > CD kafka_2.11-0.10.0.0
\
Step 2: Start the server
\
Kafka uses ZooKeeper, so you need to start ZooKeeper Server first. You can easily start using scripts.
\
> bin/zookeeper server – start. Sh config/zookeeper properties [the 15:01:37 2013-04-22, 495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) …
Now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka. Utils. VerifiableProperties) [the 15:01:47 2013-04-22, 051] the INFO Property socket.. The send buffer. The bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) …
\
Create a topic
\
We can create a topic named “test” with only one partition and only one backup.
\
> bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
\
Now you can look at topic, using the command line:
\
> bin/kafka-topics.sh –list –zookeeper localhost:2181
test
\
Also, you can choose to configure your brokers to automatically create topics when sending a message to a topic that does not exist, rather than manually.
\
Step4: send some messages
\
Kafka has a command-line client that can take input from either a file or standard input and send that input as a message to the Kafka cluster. The default setting is that each line is sent as a separate message.
\
Run Producer, then input some messages to the controller and send them to the server.
\
> bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
This is a message
This is another message\
\
\
Start a Consumer
\
Kafka also has a consumer command line, which outputs messages to standard output.
\
> bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
This is a message
This is another message
\
If you run these commands on different terminals, you can enter messages on the Producer terminal and then see them on the Consumer terminal.
\
All command-line tools have additional options; Do not enter any arguments when running the command, more details of the options will be printed.
\
Step 6: Set up a cluster of brokers
\
So far, the tests we’ve done above have been in a single broker environment, but that’s not interesting enough. For Kafka, a single-node broker is a cluster of one node, so nothing changes unless you start more broker instances. However, just to feel these changes, we expanded our cluster to three nodes (don’t worry about running out of machines, all three nodes are deployed on the same machine).
\
First, we need to create configuration files for each broker by making several copies of config/server.properties and configuring them accordingly:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
\
You can now edit the new configuration file and set it as follows:
\
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
\
The broker.id attribute is unique and is the permanent name of each node in the cluster. We needed to change the port and log directory simply because we were running all nodes on the same machine and we wanted to avoid all brokers registering the same port or overwriting their own data.
\
We have started ZooKeeper and the single node in the previous steps, so we only need to start two new nodes:
\
> bin/kafka-server-start.sh config/server-1.properties &
…
> bin/kafka-server-start.sh config/server-2.properties &
…
\
Now create a new topic and set Replication Factore to 3
> bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic
\
Ok, now that we have a cluster, how do we know which brokers are running? Run the “describe Topics” command and see:
\
> bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated- Topic Partition: 0 Leader: 1 Replicas: 1, 0 Isr: 1 0
\
The output is explained here. The first line is the total number of partitions, and each of the following lines represents information about one partition. Since the topic currently has only one partition, there is only one row.
\
The leader is the node responsible for the read and write operations on a given partition. Each node will be a randomly selected leader for partitions.
\
“Replicas” is a list of backup nodes for a given partition, whether they are leader or not, or whether they are still active.
\
Isr is a synchronized backup list. This is a subset of the backup list, that is, the backup nodes that are currently active and can be contacted by the Leader.
\
Note: in my example, 1 is the leader of the only partition in the topic.
\
We can run the same command line to view information about the topic we originally created:
> bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Of course, the original Topic had no replica backup and was on Server 0, which was the only server in our cluster when we created this server.
\
Let’s send some messages to the new topic:
\
> bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic
. my test message 1 my test message 2 ^C
\
These messages can now be consumed
\
> bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic
.
my test message 1
my test message 2
^C
\
Now we can test kafka’s fault tolerance. Broker 1 always acts as the leader, so we can kill it:
\
> ps | grep server-1.properties
7564 ttys002 0:15. 91 / System/Library/Frameworks/JavaVM framework Versions / 1.6 / Home/bin/Java… > kill -9 7564
\
Leadership moves to one of the two led, and Node 1 is out of sync.
Backup Settings:
\
> bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic Topic:my-replicated-topic Replicationfactor PartitionCount: 1:3 configs: Topic: my – the replicated – Topic Partition: 0 leader: 2 replicas: 1 0 Isr: 2, 0
\
But the message can still be used, even if the leader who originally did the writing has been killed.
\
> bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic
.
my test message 1
my test message 2
^C
\
\
Use Kafka Connect to import/export data
\
Using terminals to write data or write data back to terminals is a convenient way to test when you first start using Kafka, but later you may import data into Kafka from other sources or export data from Kafka to other systems. For many systems, data can be imported or exported using the Kafka Connect tool without having to develop client-side inheritance code. Kafka Connect is a built-in Kafka tool designed to import or export data and is extensible. Connectors Connect to external systems using client logic. In QuickStart, you can see how Kafka Connect uses simple Connectors to import data from a file to Kafka Topic and export data from a Kafka Topic to a file. First, create some files containing the information flow to test:
\
> echo -e "foo\nbar" > test.txt
Copy the code
Next, we will launch two connectors running in singleton mode, which means they are running in a separate, local, specialized process. We provide three configuration files as input parameters. The first is the configuration of the Kafka Connect process, which is mainly the general configuration, such as the Kafka Brokers of the connection and the data serialization format. Each of the remaining configuration files is used to create the connector. These configuration files include a unique Connector name, instantiated Connector classes, and other configurations required by the connector.
\
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Copy the code
These simple profiles are built into Kafka and use the default local cluster configuration to create two Connectors: the first is the source connector, reading line-by-line data from the input file and then sending this data to a Topic in Kafka. The second is the destination connector, which reads messages from Kafka’s TOIC and then writes each message as a line to the output file. During startup, you will see a number of logs, some of which actually indicate the connectors are being instantiated. Once the Kafka Connect process is started, the source connector should start reading data line by line from the following files:
test.txt
Also send these data to topic:
connect-test
At the same time, the destination connector should start reading messages from the topic:
connect-test
And writes the message to a file:
test.sink.txt
We can verify that each line of data was sent completely by checking the contents of the file:
> cat test.sink.txt
foo
bar
Copy the code
\
Note that the data is stored in Kafka Topic
connect-test
Therefore, you can run the terminal Consumer tool script to view the data in the Topic (or verify this through the client Consumer code) :
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
Copy the code
Connectors are constantly processing data, so you can add data to test.txt, as you can see through the Kafka pipe:
> echo "Another line" >> test.txt
Copy the code
You should be able to export this line of data to the consumer terminal and also to test.sink.txt
\
\
Use Kafka Streams to process data
Kafka Streams is a Kafka client library that is used to capture real-time stream processing and analyze data stored in Kafka Brokers. This example will show how to use the library to run a streaming processing application. Here is the main code for WordCountDemo (which is easier to read when converted to Java8 lambda expressions) :
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
Copy the code
\
This example implements the WordCount algorithm, which calculates the frequency of words in the input text. However, instead of calculating fixed-size data like the other examples you’ve seen before, the WordCount demo application is a little different and computes based on a never-ending stream of data. Much like a model that computes fixed data, it also updates word frequency calculations. However, because it is based on a never-ending stream of data, it periodically outputs the current calculation, and it keeps processing more data because it does not know when the stream will end.
\
You can now import input data into Kafka Topic, which is processed by the Kafka Streams application:
\
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
Copy the code
\
We then send the data to a stream-file-input topic, which can be done using the producer endpoint (in fact, the streaming data will continue to flow into Kafka and the application will start and run) :
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
Copy the code
> cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
Copy the code
\
Next, you can run the WordCount demo application to process the input data:
\
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
Copy the code
\
There is no standard output, and the results are constantly written back to another topic in Kafka called stream-Wordcount-Output. The demo will run for a few seconds, after which it will terminate automatically, unlike a typical stream processing application.
\
Now you can examine the output of the WordCount demo application:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Copy the code
\
The terminal will print the following data:
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
Copy the code
\
The first column is the key of the Kafka message and the second column is the message value, both in java.lang.String format. Note that the output should actually be a continuous update of the data stream, and that each record in the data stream (for example, each line in the output above) is either a number of individual terms, or a number of recorded keys, such as “kafka” above. If multiple records have the same key, each subsequent record updates the previous record.
\
Now that you can write more messages to the stream-file-input topic, you can observe that more messages are sent to the Stream-wordcount-output topic, reflecting the number of words that have been updated.
\
You can end the consumer by pressing Ctrl+C.
\
\
1.4 Ecosystem
\
In addition to its main distributed features, Kafka can be integrated with many tools.
Ecosystem Page lists a number of tools that can be integrated, including streaming systems, Hadoop integration, detection, and deployment tools.
\
\
Upgrade From Previous Versions
Upgrading from 0.8.x or 0.9.x to 0.10.0.0
0.10.0.0 has potential breaking changes (please review before upgrading) and possible performance impact following the upgrade . By following the recommended rolling upgrade plan below, you guarantee no downtime and no performance impact during and following the upgrade.
Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
Notes to Clients with Version 0.9.0.0: Due to a bug introduced in 0.9.0.0, clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not Work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 before brokers are upgraded to 0.10.0.x. This step is not necessary for upgrading 0.8 X or 0.9.0.1 clients.
For a rolling upgrade:
-
Update server.properties file on all brokers and add the following properties:
- System. The broker. The protocol version = CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).
- log.message.format.version=CURRENT_KAFKA_VERSION (See potential performance impact following the upgrade for the details on what this configuration does.)
-
Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
-
Once the entire cluster is upgraded, Bump the protocol version by editing system. The broker. Protocol. The version and setting it to 0.10.0.0. NOTE: You shouldn’t touch log.message.format.version yet – this parameter should only change once all consumers have been Upgraded to 0.10.0.0
-
Restart the brokers one by one for the new protocol version to take effect.
-
Once all consumers have been upgraded to 0.10.0, Change the message. The format. The version to 0.10.0 on each broker and restart them one by one.
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
Potential performance impact following upgrade to 0.10.0.0
The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. The on disk message format can be configured through log.message.format.version in the server.properties file. The default On-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, It only acknowledges message formats before 0.10.0. In this case, The broker is able to convert messages from the 0.10.0 format to an earlier format before sending the response to the consumer on an older version. However, the broker can’t use zero-copy transfer in this case. Reports from the Kafka community on the performance impact have shown CPU utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all clients to bring performance back to normal. To avoid such message conversion Before consumers are upgraded to 0.10.0.0, One can set the message. The format. The version to 0.8.2 or 0.9.0 when upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old consumers. Once consumers are upgraded, One can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression. The conversion is supported to ensure compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. Therefore it is critical to avoid the message conversion as much as possible when brokers have been upgraded but the majority of clients have not.
For clients that are upgraded to 0.10.0.0, there is no performance impact.
Note: By setting the message format version, one certifies that all existing messages are on or below that message format version. Otherwise consumers before 0.10.0.0 might break. In particular, after the message format is set to 0.10.0, one should not change it back to an earlier format as it may break consumers on versions before 0.10.0.0.
Note: Due to the additional timestamp introduced in each message, producers sending small messages may see a message throughput degradation because of the increased overhead. Likewise, replication now transmits an additional 8 bytes per message. If you’re running close to the network capacity of your cluster, it’s possible that you’ll overwhelm the network cards and see failures and performance issues due to the overload.
Note: If you have enabled compression on producers, you may notice reduced producer throughput and/or lower compression rate on the broker in some cases. When receiving Compressed messages, 0.0.1 brokers avoid recompressing the messages, which in general reduces the latency and improves the throughput. In certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this happens, users can tune linger.ms and batch.size of the producer for better throughput. In addition, the producer buffer used for compressing messages with snappy is smaller than the one used by the broker, which may have a negative impact on the compression ratio for the messages on disk. We intend to make this configurable in a future Kafka release.
Potential breaking changes in 0.10.0.0
- Starting from Kafka 0.10.0.0, the message format version in Kafka is represented as the Kafka version. Message Format 0.9.0 refers to the highest message version supported by Kafka 0.9.0.
- Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages.
- ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0
- FetchRequest/Response V2 has been introduced and is used by default to support message format 0.10.0
- MessageFormatter interface was changed from
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
todef writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
- MessageReader interface was changed from
def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
todef readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
- MessageFormatter’s package was changed from
kafka.tools
tokafka.common
- MessageReader’s package was changed from
kafka.tools
tokafka.common
- MirrorMakerMessageHandler no longer exposes the
handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])
method as it was never called. - The 0.7 kafkamigrational tool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, Please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0.
- The new consumer has standardized its APIs to accept
java.util.Collection
Existing code may have to be updated to work with the 0.10.0 Client Library. - LZ4-compressed message handling was changed to use an interoperable framing specification (LZ4f v1.5.1). To maintain compatibility with old clients, this change only applies to Message format 0.10.0 and later. Clients that Produce/Fetch LZ4-compressed messages using v0/v1 (Message format 0.9.0) should continue to use the 0.9.0 framing implementation. Clients that use Produce/Fetch protocols v2 or later should use interoperable LZ4f framing. A list of interoperable LZ4 libraries is available at www.lz4.org/
For changes in 0.10.0.0
- Starting from Kafka 0.10.0.0, a new client library named Kafka Streams is available for stream processing on data stored in Kafka topics. This new Client library only works with 0.0.x and upward versioned brokers due to message format changes mentioned above. For more information please read this section.
- The default value of the configuration parameter
receive.buffer.bytes
is now 64K for the new consumer. - The new consumer now exposes the configuration parameter
exclude.internal.topics
to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled. - The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible.
- The new consumer API has been marked stable.
Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0
0.9.0.0 has potential breaking changes (please review before upgrading) and an inter-broker protocol change from upgrading previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.
For a rolling upgrade:
- The Update server. The properties file on all brokers and add the following property: Intel. Broker. Protocol. The version = 0.8.2. X
- Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
- Once the entire cluster is upgraded, Bump the protocol version by editing system. The broker. Protocol. The version and setting it to 0.9.0.0.
- Restart the brokers one by one for the new protocol version to take effect
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
Potential breaking changes in 0.9.0.0
- Java 1.6 is no longer supported.
- Scala 2.9 is no longer supported.
- Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly.
- Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync.
- Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync.
- Compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. In 0.8.x, a message without key would cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics).
- MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single –consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration.
- Tools packaged under org.apache.kafka.clients.tools.* have been moved to org.apache.kafka.tools.*. All included scripts will still function as usual, only custom code directly importing these classes will be affected.
- The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh.
- The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure.
- The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a ‘.’ or ‘_’ in the topic name, and error in the case of an actual collision.
- The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the new producer instead of the old producer be default, and users have to specify ‘old-producer’ to use the old producer.
- By default all command line tools will print all logging messages to stderr instead of stdout.
For changes in 0.9.0.1
- The new broker id generation feature can be disabled by setting broker.id.generation.enable to false.
- Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics.
- Default value of configuration parameter fetch.min.bytes for the new consumer is now 1 by default.
Deprecations 0.9.0.0 in
- Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality.
- The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality.
- The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class).
- The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured
Upgrading from 0.8.1 to 0.8.2
0.8.2 is fully compatible with 0.8.1. The upgrade can be done one broker at a time by simply bringing it down updating the code, and restarting it.
Upgrading from 0.8.0 to 0.8.1
The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.
Upgrading from 0.7
Release 0.7 is incompatible with newer releases. Major changes were made to the API, ZooKeeper data structures, and protocol, and configuration in order to add replication (Which was missing in 0.7). The upgrade from 0.7 to later versions requires a special tool for migration. This migration can be done without downtime.\