Initial public account: Shi Zhenzhen’s grocery store ID: JJDLMN Personal WX: JJDLMN_
Most of the following operations can be visualized on the platform using Logi-Kafka-Manager;
@[TOC]
1.TopicCommand
1.1. The Topic created
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
Related optional parameters
parameter | describe | example |
---|---|---|
--bootstrap-server Specify the Kafka service |
Specifies the Kafka service to which to connect; If you have this parameter, then--zookeeper You don’t need to |
–bootstrap-server localhost:9092 |
--zookeeper |
Deprecated, connect to Kafka cluster through ZK connection mode; | — Zookeeper localhost:2181 or localhost:2181/kafka |
--replication-factor |
Number of replicas (no more than the number of brokers) If not, the default configuration in the cluster will be used | –replication-factor 3 |
--partitions |
Number of partitions. This is used to specify the number of partitions when creating or modifying a topic. If no parameter is provided at the time of creation, the default value in the cluster is used. Note that there will be a problem if the partition is smaller than before | –partitions 3 |
--replica-assignment |
Copy partition allocation mode; When creating a topic, you can specify the replica allocation. | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; This means that there are three partitions and three replicas for each assigned Broker. Comma separated identification partitions; Colon separations indicate duplicates |
--config <String: name=value> |
Use to set Topic level configuration to override default configuration.Only works when –create and –bootstrap-server are used together; See the attachment at the end of this article for the list of parameters that can be configured | For example, overwrite two configurations--config retention.bytes=123455 --config retention.ms=600001 |
--command-config <String: command file path > |
To configure the Admin Client launch configuration,Only works when –bootstrap-server is used together; | For example: setting the timeout for a request--command-config config/producer.proterties ; Then configure request.timeout.ms=300000 in the file |
1.2. Delete the Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
Support regular expression matching Topic for deletion. Just enclose Topic with double quotation marks. For example: delete Topic starting with create_topic_byhand_zk;
bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “create_topic_byhand_zk.*”
.
Represents any single character that matches other than the newline character \n. To match.., use…
, *,
: Matches the preceding subexpression zero or more times. To match the * character, use *.
. *
: arbitrary character
Delete any Topic (use caution)
bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “.*?”
See regular expressions for more information
1.3.Topic partition scaling
ZK mode (not recommended)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
Kafka version >= 2.2 supports the following (recommended)
Expansion of a single Topic
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
Batch scale (scale up to 4 Topic partitions for all regular expressions matched)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
“. *?” The meaning of a regular expression is to match everything; You can match on demand
When a Topic has fewer partitions than the specified number, it throws an exception. However, it will not affect the normal progress of other topics;
Related optional parameters
parameter | describe | example |
---|---|---|
--replica-assignment |
Copy partition allocation mode; When creating a topic, you can specify the replica allocation. | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; This means that there are three partitions and three replicas for each assigned Broker. Comma separated identification partitions; Colon separations indicate duplicates |
PS: This is the full partition replica allocation configuration, but it is the new partition that is in effect. For example: before 3 partition 1 copy is like this
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 |
Now add a new partition,– Replica-Assignment 2,1,3,4; Partitions 0,1 swap brokers with each other
Broker-1 | Broker-2 | Broker-3 | Broker-4 | |
---|---|---|---|---|
1 | 0 | 2 | 3 |
But it doesn’t actually do that, because the Controller cuts off the first three; Only take the new partition allocation method, the original will not change
Broker-1 | Broker-2 | Broker-3 | Broker-4 | |
---|---|---|---|---|
0 | 1 | 2 | 3 |
1.4. Query Topic Description
1. Query a single Topic
sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal
Sh bin/kafka-topics. Sh — Topic “.*?” –bootstrap-server xxxx:9092 –describe –exclude-internal
Support for regular expression matching topics by wrapping the Topic in double quotation marks
Related optional parameters
parameter | describe | example | |
---|---|---|---|
--bootstrap-server Specify the Kafka service |
Specifies the Kafka service to which to connect; If you have this parameter, then--zookeeper You don’t need to |
–bootstrap-server localhost:9092 | |
--at-min-isr-partitions |
Omit some counts and configuration information when querying | --at-min-isr-partitions |
|
--exclude-internal |
Exclude Kafka internal topics such as__consumer_offsets-* |
--exclude-internal |
|
--topics-with-overrides |
Displays only topics that have been configured to override the default configuration, i.e., topics that are configured to override the default configuration. No partition information is displayed | --topics-with-overrides |
5. Query the Topic list
1. Query the list of all topics
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal
2. Query Matches Topic List (regular expression)
The query
test_create_
A list of all topics at the beginning
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"
Related optional parameters
parameter | describe | example |
---|---|---|
--exclude-internal |
Exclude Kafka internal topics such as__consumer_offsets-* |
--exclude-internal |
--topic |
You can match with a regular expression to display the topic name | --topic |
2.ConfigCommand
Config related operations; Dynamic configuration can override the default static configuration;
2.1 Query Configuration
Topic configuration query
Shows dynamic and static configuration for topics
1. Query a single Topic configuration (list only dynamic configuration)
Sh bin/kafka-configs.sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic or sh bin/kafka-configs.sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic or sh bin/kafka-configs.sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic — Describe –bootstrap-server 172.23.248.85:9092 –entity-type topics –entity-name test_create_topic
Sh bin/kafka-configs.sh –describe –bootstrap-server 172.23.248.85:9092 –entity-type topics
3. Query the detailed configuration of Topic (dynamic + static)
You just have to add one argument
--all
Other configuration/clients/users/brokers/broker – loggers queries
In the same way; Just need to
--entity-type
To the corresponding type line (switchable viewer/clients/users/brokers/broker – loggers)
Query Kafka version information
sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version
<font color=red> <font color=red> < / fontThe attachmentPart of < / font >
2.2 Add, delete and modify configuration--alter
–alter
Delete configuration: — delete – config k1 = v1, k2 = v2 add/modify configuration: – add – config k1, k2 option types: – the entity – type (switchable viewer/clients/users/brokers/broker –
loggers)
Type name: –entity-name
Topic adds/modifies dynamic configuration
--add-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999
Topic removes dynamic configuration
--delete-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms
The same applies to other configurations, except that the type is changed--entity-type
Type: (switchable viewer/clients/users/brokers/broker – loggers)
3. Replication scaling, partition migration, cross-path migration kafka-reassign-partitions
Please stamp [[Kafka operation] copy expansion capacity, data migration, copy redistribution, copy cross-path migration]() (If the dot does not appear, it means that the article has not been published, please wait patiently)
4.Topic sending kafka-console-producer.sh
4.1 No key message in production
## Producer bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties
Produces a key message with attributes — Property parse. Key =true
## Producer bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true
< font color = red > default message key and use “Tab” separation between news value, so the message in the key and the value do not use the escape character (\ t) < / font >
Optional parameters
parameter | Value types | instructions | Valid values |
---|---|---|---|
–bootstrap-server | String | The server to connect to must (unless you specify — broke-list) | Such as: host1: prot1, host2: prot2 |
–topic | String | Subject name of the message received (required) | |
–batch-size | Integer | The number of messages sent in a single batch | 200(default) |
–compression-codec | String | Compression codec | None, gzip(default)snappy, lz4, ZSTD |
–max-block-ms | Long | The maximum time that a producer will block during the sending of a request | 60000(default value) |
–max-memory-bytes | Long | The producer buffers the total memory waiting to be sent to the server | 33554432(default) |
–max-partition-memory-bytes | Long | Buffer size allocated for the partition | 16384 |
–message-send-max-retries | Integer | Maximum number of retry sends | 3 |
–metadata-expiry-ms | Long | Time Threshold for Forced Update of Metadata (ms) | 300000 |
–producer-property | String | A mechanism for passing custom properties to the generator | Such as: key = value |
–producer.config | String | The producer configuration properties file [–producer-property] takes precedence over the full path of this configuration profile | |
–property | String | Custom message readers | parse.key=true/false key.separator=<key.separator>ignore.error=true/false |
–request-required-acks | String | The acknowledgement method requested by the producer | 0, 1(default), ALL |
–request-timeout-ms | Integer | The acknowledgement timeout time requested by the producer | 1500(default value) |
–retry-backoff-ms | Integer | The wait time threshold for the metadata is refreshed before the producer retries | 100(default value) |
–socket-buffer-size | Integer | TCP receive buffer size | 102400(default value) |
–timeout | Integer | The time threshold for the message queue to wait for asynchronous processing | 1000(default) |
–sync | Synchronous sending of messages | ||
–version | Display the Kafka version | Displays as the local Kafka version when other parameters are not matched | |
–help | Print Help Information |
5. Topic consumption kafka-console-consumer.sh
1. New Client Beginning — From — Beginning (note this is a new client, it will not be consumed if it has already been consumed before). There is no client name specified below, so every execution is a new client that will be consumed from beginning
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
2. Regular expression matches topic consumption — Whitelist consumes all topics
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’
Sh bin/kafka-console-consumer. Sh –bootstrap-server localhost:9092 –whitelist ‘.*’ –from — beginning
3. Display Key for consumption--property print.key=true
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –property print.key=true
4. Specify partition consumption--partition
Specifies the starting offset consumption--offset
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –partition 0 –offset 100
5. Name the client--group
Note that after naming the client, if it has been consumed before, then –from-beginning will no longer be consumed from the beginning
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –group test-group
6. Add client properties--consumer-property
This parameter can also add properties to the client, but note that the same property cannot be configured in multiple places. They are mutually exclusive. For example, add the attribute –group test-group — to the following
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test
--consumer-property group.id=test-consumer-group
7. Add client property –consumer.config
–consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config
sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –consumer.config config/consumer.properties
parameter | describe | example | |
---|---|---|---|
--group |
Specify the ID of the group to which the consumer belongs | ||
--topic |
The topic being consumed | ||
--partition |
Specify partition; Unless specified- offset Otherwise, start consumption from the end of the partition (Latest) |
--partition 0 |
|
--offset |
The starting offset position of the execution consumption; Default: Latest; /latest /earliest/shift | --offset 10 |
|
--whitelist |
Regular expression matching topic;--topic I don’t have to specify it; All topics that match will be consumed; And of course with this parameter,--partition --offset And so on can’t be used |
||
--consumer-property |
A user-defined attribute is passed to the consumer as key=value | --consumer-property group.id=test-consumer-group |
|
--consumer.config |
Note the consumer configuration properties file, [consumer-property ] takes precedence over this configuration |
--consumer.config config/consumer.properties |
|
--property |
Initializes the properties of the message formatter | Print.timestamp =true,false, print.key=true,false, print.value=true,false, key.separator=<key.separator> Separator =<line. Separator >, key. Deserializer =<key. Deserializer >, value. Deserializer =<value. Deserializer > | |
--from-beginning |
Start with the earliest message that exists, not the latest message. Note that if the client name is configured and consumed before, it will not be consumed from scratch | ||
--max-messages |
The maximum amount of data consumed, if not specified, continues to be consumed | --max-messages 100 |
|
--skip-message-on-error |
If an error occurs while processing a message, skip it instead of pausing it | ||
--isolation-level |
Set to READ_COMMITTED to filter out uncommitted transactional messages. Set to READ_UNCOMMITTED to read all messages. Default: READ_UNCOMMITTED | ||
--formatter |
Kafka. Tools. DefaultMessageFormatter, kafka. Tools. LoggingMessageFormatter, kafka. View NoOpMessageFormatter, kafka. View Che cksumMessageFormatter |
6. Kafka-leader-election
6.1 Specify Topic Specify partition with rePreferred: Preferred copy policy
Conduct a Leader re-election
> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0
6.2 For all Topics, all partitions are restartedPreferred: Preferred copy policy
Conduct a Leader re-election
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions
6.3 Set up the configuration file to batch specify topics and partitions for Leader re-election
Start by configuring the leader-election.json file
{ "partitions": [ { "topic": "test_create_topic4", "partition": 1 }, { "topic": "test_create_topic4", "partition": 2}}]
sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json
Related optional parameters
parameter | describe | example |
---|---|---|
--bootstrap-server Specify the Kafka service |
Specifies the Kafka service to which to connect | –bootstrap-server localhost:9092 |
--topic |
Specifies Topic, which follows--all-topic-partitions andpath-to-json-file The three mutually exclusive |
|
--partition |
Specify the partition with--topic Collocation is used |
|
--election-type |
Two electoral strategies (PREFERRED: [Fixed] Priority copy election, which will fail if the first copy is not onlineUNCLEAN Strategy:) |
|
--all-topic-partitions |
All topics all partitions perform Leader re-election; This parameter with--topic andpath-to-json-file The three mutually exclusive |
|
--path-to-json-file |
Configuration file batch election, this parameter with--topic andall-topic-partitions The three mutually exclusive |
Kafka-verifiable -producer.sh Continually push batch messages kafka-verifiable-producer.sh
Sending 100 messages at a time--max-messages 100
The default number of pushes is -1, which means push until the process is closed
sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--max-messages 100
Sends a maximum throughput of no more than messages per second--throughput 100
Throughput of pushing messages, per messages/ SEC. Default is -1, which means there is no limit
sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--throughput 100
The body of the message sent is prefixed--value-prefix
sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--value-prefix 666
Note –value-prefix 666 must be an integer and the format of the body of the message sent is a dot. For example: 666.
–producer.config CONFIG_FILE Specifies the producer’s configuration file — the ACK value of each push message by acks. The default value is -1
Kafka-verifiable -consumer)
Continue to consumption
sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4
Maximum consumption of 10 messages at a time--max-messages 10
sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4
--max-messages 10
Related optional parameters
parameter | describe | example |
---|---|---|
--bootstrap-server Specify the Kafka service |
Specifies the Kafka service to which to connect; | –bootstrap-server localhost:9092 |
--topic |
Specify the topic to consume | |
--group-id |
Consumer ID; Each time you do not specify a new group ID | |
group-instance-id |
Consuming group instance ID with unique value | |
--max-messages |
Maximum number of messages consumed at one time | |
--enable-autocommit |
Whether to enable offset auto-commit; The default is false | |
--reset-policy |
When there is no previous consumption record, select the strategy to pull offset, which can beearliest .latest .none . The default is the earliest |
|
--assignment-strategy |
Consumer assigns a partitioning policy. Default isorg.apache.kafka.clients.consumer.RangeAssignor |
|
--consumer.config |
Specify the configuration file for Consumer |
9. Producer stress test kafka-producer-perf-test.sh
1. 1024 messages sent--num-records 100
And each message size is 1KB--record-size 1024
The maximum throughput is 10000 pieces per second--throughput 100
sh bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100000 –producer-props bootstrap.servers=localhost:9092 –record-size 1024
You can go throughLogIKMCheck to see if the partition has increased the corresponding data size
fromLogIKMYou can see that 1024 messages were sent; And the total data amount =1M; 1024 *1024byte = 1M;
2. Use the specified message file--payload-file
Sending 100 messages with maximum throughput of 100 messages per second--throughput 100
- Configure the good news file first
batchmessage.txt
-
Then the message will be randomly selected from BatchMessage.txt. Note that we don’t use the — Payload-Delimeter parameter to specify the separator. The default separator is \n newline;
bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100 –producer-props bootstrap.servers=localhost:9090 –payload-file config/batchmessage.txt
-
Validate the message so that you can view the sent message through LogiKM
Related optional parameters
parameter | describe | example |
---|---|---|
--topic |
Specify the topic to consume | |
--num-records |
How many messages are sent | |
--throughput |
Maximum message throughput per second | |
--producer-props |
Producer configuration, k1=v1,k2=v2 | --producer-props bootstrap.servers= localhost:9092,client.id=test_client |
--producer.config |
Producer profile | --producer.config config/producer.propeties |
--print-metrics |
Print monitoring information at the end of test, default false | --print-metrics true |
--transactional-id |
The default value is performance-producer-default-transactional ID, which is used to test the performance of a concurrent transaction. This is only valid if the transaction is — transaction-during-ms > 0. The default value is performance-producer-default-transactional ID | |
--transaction-duration-ms |
Specifies the maximum duration of a transaction before a commitTransaction call is made to commit the transaction. Transactions are started only if a value of > 0 is specified. The default value is 0 | |
--record-size |
The size of a message byte; You must specify one of the –payload-file and the other, but not both | |
--payload-file |
Specifies the source file of the message. Only text files encoded in UTF-8 are supported. The message delimiter of the file is passed--payload-delimeter Specify, which by default is split with newline \nl, and –record-size must specify one of the two, but not both; If the message is provided |
|
--payload-delimeter |
If through--payload-file If the message content is retrieved from a file, this parameter specifies the message delimiter for the file. The default value is \n, meaning that each line of the file is considered a message. If not specified --payload-file This parameter is not in effect; When sending a message, select the message to send in a random file; |
10. Kafka-consumer-perf-test.sh
Consume 100 messages --messages 100
sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 –bootstrap-server localhost:9090 –messages 100
Related optional parameters
parameter | describe | example | |
---|---|---|---|
--bootstrap-server |
|||
--consumer.config |
Consumer profile | ||
--date-format |
The results are printed out in time format | Default: yyyy-mm-dd HH: MM :ss:SSS | |
--fetch-size |
Gets the size of the data on a single request | The default is 1048576 | |
--topic |
Specify the topic to consume | ||
--from-latest |
|||
--group |
Consumer group ID | ||
--hide-header |
If set, the header information is not printed | ||
--messages |
The amount that needs to be consumed | ||
--num-fetch-threads |
The number of threads of feth data | Default: 1. | |
--print-metrics |
Print the monitoring data at the end | ||
--show-detailed-stats |
|||
--threads |
Number of consuming threads; | The default 10 |
11. Delete the message kafka-delete-records.sh from the specified partition
Delete a message from a partition of the specified topic until offset is 1024
Json file offset-json-file.json
{"partitions":
[{"topic": "test1", "partition": 0,
"offset": 1024}],
"version":1
}
Executing a command
Sh bin/kafka-delete-records. Sh –bootstrap-server 172.23.250.249:9090 –offset-json-file config/offset-json-file
Verify that the message sent is viewed through LogiKM
You can see it here, configuration"offset": 1024
Delete the message from the beginning to the offset of 1024; It’s deleted from the front
12. Check the Broker disk information
Query for the specified topic disk information --topic-list topic1,topic2
sh bin/kafka-log-dirs.sh –bootstrap-server xxxx:9090 –describe –topic-list test2
Query the specified Broker disk information--broker-list 0 broker1,broker2
sh bin/kafka-log-dirs.sh –bootstrap-server xxxxx:9090 –describe –topic-list test2 –broker-list 0
For example, I found information about a Topic with 3 partitions and 3 replicas in the log.dir Broker
{
"version": 1,
"brokers": [{
"broker": 0,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 1,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 2,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 3,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
"error": null,
"partitions": []
}]
}]
}
If you find it difficult to query disk information by command, you can also useLogIKMTo view
12. Consumer Groups manage kafka-consumer-groups.sh
1. Look at the customer list--list
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list
Call MetaDataRequest to get a list of all online brokers and then send a ListGroupsRequest request to each Broker for consumer group data
2. View consumer group details--describe
DescribeGroupsRequest
View Consumer Group details--group
或 --all-groups
–bootstrap-server XXXXX :9090 — Describe –group test2_consumer_group –bootstrap-server XXXXX :9090 — Describe –group test2_consumer_group –bootstrap-server XXXXX :9090 — Describe –group test2_consumer_group
View all consumer group details
--all-groups
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
View all topics consumed by this consumer group, their partitions, latest consumption offset, Log latest data offset, Lag unconsumed quantity, consumer ID and other information
Query consumer member information--members
All consumer group member information
sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
Specify consumer group member information
sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090
Query consumer status information--state
All consumption group status information
sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
Specifies consumer group status information
sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090
3. Delete the consumer group--delete
DeleteGroupsRequest
Delete the consumer group –delete
Deletes the specified consumer group
--group
sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
Delete all consumer groups--all-groups
sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090
PS: If you want to delete a consumer group, you can only delete it successfully if all the clients of the consumer group have stopped consumption/are not online. Otherwise the following exception will be reported
Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
4. Reset the offset of the consumption group--reset-offsets
< / font > < / font > < / font > < / font > < / font > < / p > < p style = “>”; </font>
The following example takes the following parameters: –dry-run; This parameter indicates pre-execution and prints out the results to be processed. When you want to actually execute it, change the parameter to “excute”;
The following examples of reset modes are all –to — earliest;
Please refer to the following reset Offset mode to change to another mode as needed;
Resets the offset for the specified consumption group--group
Resets offsets for all topics of the specified consumer group
--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
Resets the offset of the specified Topic for the specified consumer group--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2
Reset offsets for all consumer groups--all-group
Reset offsets for all topics for all consumer groups
--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
Resets the offset of the specified Topic in all consumption groups--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2
— The mode that needs to be reset after reset-offsets
Relate to the reset Offset mode
parameter | describe | example |
---|---|---|
--to-earliest : |
Reset the offset to the original offset(find the original offset that has not been deleted) | |
--to-current : |
Reset offset directly to the current offset, which is LOE | |
--to-latest : |
Reset to last offset | |
--to-datetime : |
Reset offset to the specified time; Format for:YYYY-MM-DDTHH:mm:SS.sss ; |
- to - a datetime "2021-6-26 T00:00:00. 000" |
--to-offset |
Reset to the specified offset, but in general, match to more than one partition, in this case, all matches are reset to this value; If 1. Target maximum offset<--to-offset , reset to the target maximum offset at this time; 2. Target minimum offset>--to-offset , reset to minimum; 3. Otherwise it will reset to--to-offset The target value of;You don’t usually use this |
--to-offset 3465 |
--shift-by |
How many offsets to add or subtract according to the offset; Positive means forward increase; Negative retreating; And, of course, it matches all of them; | --shift-by 100 、--shift-by -100 |
--from-file |
Reset according to CVS documentation; I’m going to do it separately here |
--from-file
So let me just highlight
Some of the other modes above reset all partitions that are matched; It is not possible to reset each partition to a different offset. However,
--from-file
It allows us to be a little more flexible;
-
First configure the CVS document format to Topic: partition number: reset target offset
Test2, 0100 test2, 1200 test2, 2300
-
Execute the command
sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv
5. Delete offsetdelete-offsets
< / font > < / font > < / font > < / font > < / font > < / p > < p style = “>”; </font>
After the offset is removed, the next time the Consumer Group starts, it will start from scratch.
sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2
Related optional parameters
parameter | describe | example |
---|---|---|
--bootstrap-server |
Specifies the Kafka service to which to connect; | –bootstrap-server localhost:9092 |
--list |
List all consumer group names | --list |
--describe |
Query consumer description information | --describe |
--group |
Specified Consumption Group | |
--all-groups |
Specify all consumer groups | |
--members |
Query the member information of a consuming group | |
--state |
Query the status information of the consumer | |
--offsets |
This parameter lists the message’s offset information when querying the consumption group description information. By default it will have this parameter; | |
dry-run |
Using this parameter allows you to see the reset in advance when the offset is reset, before the actual execution is performed--excute ; The default isdry-run |
|
--excute |
The actual operation to reset the offset; | |
--to-earliest |
Reset offset to the earliest | |
to-latest |
Reset offset to the nearest |
The attachment
Some optional configurations of ConfigCommand
Topic related optional configuration
key | value | The sample |
---|---|---|
cleanup.policy | Clean up the strategy | |
compression.type | Compression type (usually recommended on the produce side) | |
delete.retention.ms | The retention time of the compressed log | |
file.delete.delay.ms | ||
flush.messages | Persistence Message restrictions | |
flush.ms | Persistence frequency | |
follower.replication.throttled.replicas | Flowwer copy flow limit format: section number: copy follower number; section number: copy follower number | 1-0, 1:1 |
index.interval.bytes | ||
leader.replication.throttled.replicas | Partition number: replica leader number | 0-0 draw |
max.compaction.lag.ms | ||
max.message.bytes | The maximum Batch message size | |
message.downconversion.enable | Message is backwards compatible | |
message.format.version | Message format version | |
message.timestamp.difference.max.ms | ||
message.timestamp.type | ||
min.cleanable.dirty.ratio | ||
min.compaction.lag.ms | ||
min.insync.replicas | The smallest ISR | |
preallocate | ||
retention.bytes | Log retention size (usually time bound) | |
retention.ms | Log retention time | |
segment.bytes | Segment size limits | |
segment.index.bytes | ||
segment.jitter.ms | ||
segment.ms | Segment cutting time | |
unclean.leader.election.enable | Whether to allow asynchronous replica selection |
Broker related optional configuration
key | value | The sample |
---|---|---|
advertised.listeners | ||
background.threads | ||
compression.type | ||
follower.replication.throttled.rate | ||
leader.replication.throttled.rate | ||
listener.security.protocol.map | ||
listeners | ||
log.cleaner.backoff.ms | ||
log.cleaner.dedupe.buffer.size | ||
log.cleaner.delete.retention.ms | ||
log.cleaner.io.buffer.load.factor | ||
log.cleaner.io.buffer.size | ||
log.cleaner.io.max.bytes.per.second | ||
log.cleaner.max.compaction.lag.ms | ||
log.cleaner.min.cleanable.ratio | ||
log.cleaner.min.compaction.lag.ms | ||
log.cleaner.threads | ||
log.cleanup.policy | ||
log.flush.interval.messages | ||
log.flush.interval.ms | ||
log.index.interval.bytes | ||
log.index.size.max.bytes | ||
log.message.downconversion.enable | ||
log.message.timestamp.difference.max.ms | ||
log.message.timestamp.type | ||
log.preallocate | ||
log.retention.bytes | ||
log.retention.ms | ||
log.roll.jitter.ms | ||
log.roll.ms | ||
log.segment.bytes | ||
log.segment.delete.delay.ms | ||
max.connections | ||
max.connections.per.ip | ||
max.connections.per.ip.overrides | ||
message.max.bytes | ||
metric.reporters | ||
min.insync.replicas | ||
num.io.threads | ||
num.network.threads | ||
num.recovery.threads.per.data.dir | ||
num.replica.fetchers | ||
principal.builder.class | ||
replica.alter.log.dirs.io.max.bytes.per.second | ||
sasl.enabled.mechanisms | ||
sasl.jaas.config | ||
sasl.kerberos.kinit.cmd | ||
sasl.kerberos.min.time.before.relogin | ||
sasl.kerberos.principal.to.local.rules | ||
sasl.kerberos.service.name | ||
sasl.kerberos.ticket.renew.jitter | ||
sasl.kerberos.ticket.renew.window.factor | ||
sasl.login.refresh.buffer.seconds | ||
sasl.login.refresh.min.period.seconds | ||
sasl.login.refresh.window.factor | ||
sasl.login.refresh.window.jitter | ||
sasl.mechanism.inter.broker.protocol | ||
ssl.cipher.suites | ||
ssl.client.auth | ||
ssl.enabled.protocols | ||
ssl.endpoint.identification.algorithm | ||
ssl.key.password | ||
ssl.keymanager.algorithm | ||
ssl.keystore.location | ||
ssl.keystore.password | ||
ssl.keystore.type | ||
ssl.protocol | ||
ssl.provider | ||
ssl.secure.random.implementation | ||
ssl.trustmanager.algorithm | ||
ssl.truststore.location | ||
ssl.truststore.password | ||
ssl.truststore.type | ||
unclean.leader.election.enable |
The Users related optional configuration
key | value | The sample |
---|---|---|
SCRAM-SHA-256 | ||
SCRAM-SHA-512 | ||
consumer_byte_rate | Limit the flow for the consumer user | |
producer_byte_rate | Current limiting for producers | |
request_percentage | Percentage of requests |
Clients related optional configurations
key | value | The sample |
---|---|---|
consumer_byte_rate | ||
producer_byte_rate | ||
request_percentage |
Most of the operations above can be visualized on the platform using Logi-Kafka-Manager;