Initial public account: Shi Zhenzhen’s grocery store ID: JJDLMN Personal WX: JJDLMN_

Most of the following operations can be visualized on the platform using Logi-Kafka-Manager;

@[TOC]

1.TopicCommand

1.1. The Topic created

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test


Related optional parameters

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to which to connect; If you have this parameter, then--zookeeperYou don’t need to –bootstrap-server localhost:9092
--zookeeper Deprecated, connect to Kafka cluster through ZK connection mode; — Zookeeper localhost:2181 or localhost:2181/kafka
--replication-factor Number of replicas (no more than the number of brokers) If not, the default configuration in the cluster will be used –replication-factor 3
--partitions Number of partitions. This is used to specify the number of partitions when creating or modifying a topic. If no parameter is provided at the time of creation, the default value in the cluster is used. Note that there will be a problem if the partition is smaller than before –partitions 3
--replica-assignment Copy partition allocation mode; When creating a topic, you can specify the replica allocation. --replica-assignmentBrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; This means that there are three partitions and three replicas for each assigned Broker. Comma separated identification partitions; Colon separations indicate duplicates
--config <String: name=value> Use to set Topic level configuration to override default configuration.Only works when –create and –bootstrap-server are used together; See the attachment at the end of this article for the list of parameters that can be configured For example, overwrite two configurations--config retention.bytes=123455 --config retention.ms=600001
--command-config<String: command file path > To configure the Admin Client launch configuration,Only works when –bootstrap-server is used together; For example: setting the timeout for a request--command-config config/producer.proterties ; Then configure request.timeout.ms=300000 in the file

1.2. Delete the Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test


Support regular expression matching Topic for deletion. Just enclose Topic with double quotation marks. For example: delete Topic starting with create_topic_byhand_zk;

bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “create_topic_byhand_zk.*”


.Represents any single character that matches other than the newline character \n. To match.., use…


, *,: Matches the preceding subexpression zero or more times. To match the * character, use *.


. *: arbitrary character

Delete any Topic (use caution)

bin/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic “.*?”

See regular expressions for more information

1.3.Topic partition scaling

ZK mode (not recommended)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

Kafka version >= 2.2 supports the following (recommended)

Expansion of a single Topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

Batch scale (scale up to 4 Topic partitions for all regular expressions matched)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

“. *?” The meaning of a regular expression is to match everything; You can match on demand

When a Topic has fewer partitions than the specified number, it throws an exception. However, it will not affect the normal progress of other topics;


Related optional parameters

parameter describe example
--replica-assignment Copy partition allocation mode; When creating a topic, you can specify the replica allocation. --replica-assignmentBrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; This means that there are three partitions and three replicas for each assigned Broker. Comma separated identification partitions; Colon separations indicate duplicates

PS: This is the full partition replica allocation configuration, but it is the new partition that is in effect. For example: before 3 partition 1 copy is like this

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2

Now add a new partition,– Replica-Assignment 2,1,3,4; Partitions 0,1 swap brokers with each other

Broker-1 Broker-2 Broker-3 Broker-4
1 0 2 3

But it doesn’t actually do that, because the Controller cuts off the first three; Only take the new partition allocation method, the original will not change

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2 3

1.4. Query Topic Description

1. Query a single Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

Sh bin/kafka-topics. Sh — Topic “.*?” –bootstrap-server xxxx:9092 –describe –exclude-internal

Support for regular expression matching topics by wrapping the Topic in double quotation marks


Related optional parameters

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to which to connect; If you have this parameter, then--zookeeperYou don’t need to –bootstrap-server localhost:9092
--at-min-isr-partitions Omit some counts and configuration information when querying --at-min-isr-partitions
--exclude-internal Exclude Kafka internal topics such as__consumer_offsets-* --exclude-internal
--topics-with-overrides Displays only topics that have been configured to override the default configuration, i.e., topics that are configured to override the default configuration. No partition information is displayed --topics-with-overrides

5. Query the Topic list

1. Query the list of all topics

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2. Query Matches Topic List (regular expression)

The query
test_create_A list of all topics at the beginning


sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"


Related optional parameters

parameter describe example
--exclude-internal Exclude Kafka internal topics such as__consumer_offsets-* --exclude-internal
--topic You can match with a regular expression to display the topic name --topic

2.ConfigCommand

Config related operations; Dynamic configuration can override the default static configuration;

2.1 Query Configuration

Topic configuration query

Shows dynamic and static configuration for topics

1. Query a single Topic configuration (list only dynamic configuration)

Sh bin/kafka-configs.sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic or sh bin/kafka-configs.sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic or sh bin/kafka-configs.sh –describe –bootstrap-server XXXXX :9092 –topic test_create_topic — Describe –bootstrap-server 172.23.248.85:9092 –entity-type topics –entity-name test_create_topic

Sh bin/kafka-configs.sh –describe –bootstrap-server 172.23.248.85:9092 –entity-type topics

3. Query the detailed configuration of Topic (dynamic + static)

You just have to add one argument
--all

Other configuration/clients/users/brokers/broker – loggers queries

In the same way; Just need to
--entity-typeTo the corresponding type line (switchable viewer/clients/users/brokers/broker – loggers)

Query Kafka version information

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

<font color=red> <font color=red> < / fontThe attachmentPart of < / font >

2.2 Add, delete and modify configuration--alter

–alter

Delete configuration: — delete – config k1 = v1, k2 = v2 add/modify configuration: – add – config k1, k2 option types: – the entity – type (switchable viewer/clients/users/brokers/broker –

                                     loggers)

Type name: –entity-name

Topic adds/modifies dynamic configuration

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Topic removes dynamic configuration

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

The same applies to other configurations, except that the type is changed--entity-type

Type: (switchable viewer/clients/users/brokers/broker – loggers)

3. Replication scaling, partition migration, cross-path migration kafka-reassign-partitions

Please stamp [[Kafka operation] copy expansion capacity, data migration, copy redistribution, copy cross-path migration]() (If the dot does not appear, it means that the article has not been published, please wait patiently)

4.Topic sending kafka-console-producer.sh

4.1 No key message in production

## Producer bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

Produces a key message with attributes — Property parse. Key =true

## Producer bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true

< font color = red > default message key and use “Tab” separation between news value, so the message in the key and the value do not use the escape character (\ t) < / font >


Optional parameters

parameter Value types instructions Valid values
–bootstrap-server String The server to connect to must (unless you specify — broke-list) Such as: host1: prot1, host2: prot2
–topic String Subject name of the message received (required)
–batch-size Integer The number of messages sent in a single batch 200(default)
–compression-codec String Compression codec None, gzip(default)snappy, lz4, ZSTD
–max-block-ms Long The maximum time that a producer will block during the sending of a request 60000(default value)
–max-memory-bytes Long The producer buffers the total memory waiting to be sent to the server 33554432(default)
–max-partition-memory-bytes Long Buffer size allocated for the partition 16384
–message-send-max-retries Integer Maximum number of retry sends 3
–metadata-expiry-ms Long Time Threshold for Forced Update of Metadata (ms) 300000
–producer-property String A mechanism for passing custom properties to the generator Such as: key = value
–producer.config String The producer configuration properties file [–producer-property] takes precedence over the full path of this configuration profile
–property String Custom message readers parse.key=true/false key.separator=<key.separator>ignore.error=true/false
–request-required-acks String The acknowledgement method requested by the producer 0, 1(default), ALL
–request-timeout-ms Integer The acknowledgement timeout time requested by the producer 1500(default value)
–retry-backoff-ms Integer The wait time threshold for the metadata is refreshed before the producer retries 100(default value)
–socket-buffer-size Integer TCP receive buffer size 102400(default value)
–timeout Integer The time threshold for the message queue to wait for asynchronous processing 1000(default)
–sync Synchronous sending of messages
–version Display the Kafka version Displays as the local Kafka version when other parameters are not matched
–help Print Help Information

5. Topic consumption kafka-console-consumer.sh

1. New Client Beginning — From — Beginning (note this is a new client, it will not be consumed if it has already been consumed before). There is no client name specified below, so every execution is a new client that will be consumed from beginning

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

2. Regular expression matches topic consumption — Whitelist consumes all topics

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –whitelist ‘.*’

Sh bin/kafka-console-consumer. Sh –bootstrap-server localhost:9092 –whitelist ‘.*’ –from — beginning

3. Display Key for consumption--property print.key=true

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –property print.key=true

4. Specify partition consumption--partitionSpecifies the starting offset consumption--offset

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –partition 0 –offset 100

5. Name the client--group

Note that after naming the client, if it has been consumed before, then –from-beginning will no longer be consumed from the beginning

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –group test-group

6. Add client properties--consumer-property

This parameter can also add properties to the client, but note that the same property cannot be configured in multiple places. They are mutually exclusive. For example, add the attribute –group test-group — to the following

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test
--consumer-property group.id=test-consumer-group

7. Add client property –consumer.config

–consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config –consumer.config

sh bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –consumer.config config/consumer.properties


parameter describe example
--group Specify the ID of the group to which the consumer belongs
--topic The topic being consumed
--partition Specify partition; Unless specified- offsetOtherwise, start consumption from the end of the partition (Latest) --partition 0
--offset The starting offset position of the execution consumption; Default: Latest; /latest /earliest/shift --offset 10
--whitelist Regular expression matching topic;--topicI don’t have to specify it; All topics that match will be consumed; And of course with this parameter,--partition --offsetAnd so on can’t be used
--consumer-property A user-defined attribute is passed to the consumer as key=value --consumer-property group.id=test-consumer-group
--consumer.config Note the consumer configuration properties file, [consumer-property] takes precedence over this configuration --consumer.config config/consumer.properties
--property Initializes the properties of the message formatter Print.timestamp =true,false, print.key=true,false, print.value=true,false, key.separator=<key.separator> Separator =<line. Separator >, key. Deserializer =<key. Deserializer >, value. Deserializer =<value. Deserializer >
--from-beginning Start with the earliest message that exists, not the latest message. Note that if the client name is configured and consumed before, it will not be consumed from scratch
--max-messages The maximum amount of data consumed, if not specified, continues to be consumed --max-messages 100
--skip-message-on-error If an error occurs while processing a message, skip it instead of pausing it
--isolation-level Set to READ_COMMITTED to filter out uncommitted transactional messages. Set to READ_UNCOMMITTED to read all messages. Default: READ_UNCOMMITTED
--formatter Kafka. Tools. DefaultMessageFormatter, kafka. Tools. LoggingMessageFormatter, kafka. View NoOpMessageFormatter, kafka. View Che cksumMessageFormatter

6. Kafka-leader-election

6.1 Specify Topic Specify partition with rePreferred: Preferred copy policyConduct a Leader re-election


> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2 For all Topics, all partitions are restartedPreferred: Preferred copy policyConduct a Leader re-election

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred  --all-topic-partitions

6.3 Set up the configuration file to batch specify topics and partitions for Leader re-election

Start by configuring the leader-election.json file

{ "partitions": [ { "topic": "test_create_topic4", "partition": 1 }, { "topic": "test_create_topic4", "partition": 2}}]

 sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json
 

Related optional parameters

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to which to connect –bootstrap-server localhost:9092
--topic Specifies Topic, which follows--all-topic-partitionsandpath-to-json-fileThe three mutually exclusive
--partition Specify the partition with--topicCollocation is used
--election-type Two electoral strategies (PREFERRED: [Fixed] Priority copy election, which will fail if the first copy is not onlineUNCLEANStrategy:)
--all-topic-partitions All topics all partitions perform Leader re-election; This parameter with--topicandpath-to-json-fileThe three mutually exclusive
--path-to-json-file Configuration file batch election, this parameter with--topicandall-topic-partitionsThe three mutually exclusive

Kafka-verifiable -producer.sh Continually push batch messages kafka-verifiable-producer.sh

Sending 100 messages at a time--max-messages 100

The default number of pushes is -1, which means push until the process is closed

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--max-messages 100

Sends a maximum throughput of no more than messages per second--throughput 100

Throughput of pushing messages, per messages/ SEC. Default is -1, which means there is no limit

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092
--throughput 100

The body of the message sent is prefixed--value-prefix

sh bin/kafka-verifiable-producer.sh –topic test_create_topic4 –bootstrap-server localhost:9092 --value-prefix 666

Note –value-prefix 666 must be an integer and the format of the body of the message sent is a dot. For example: 666.

–producer.config CONFIG_FILE Specifies the producer’s configuration file — the ACK value of each push message by acks. The default value is -1

Kafka-verifiable -consumer)

Continue to consumption

sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4

Maximum consumption of 10 messages at a time--max-messages 10

sh bin/kafka-verifiable-consumer.sh –group-id test_consumer –bootstrap-server localhost:9092 –topic test_create_topic4
--max-messages 10


Related optional parameters

parameter describe example
--bootstrap-server Specify the Kafka service Specifies the Kafka service to which to connect; –bootstrap-server localhost:9092
--topic Specify the topic to consume
--group-id Consumer ID; Each time you do not specify a new group ID
group-instance-id Consuming group instance ID with unique value
--max-messages Maximum number of messages consumed at one time
--enable-autocommit Whether to enable offset auto-commit; The default is false
--reset-policy When there is no previous consumption record, select the strategy to pull offset, which can beearliest.latest.none. The default is the earliest
--assignment-strategy Consumer assigns a partitioning policy. Default isorg.apache.kafka.clients.consumer.RangeAssignor
--consumer.config Specify the configuration file for Consumer

9. Producer stress test kafka-producer-perf-test.sh

1. 1024 messages sent--num-records 100And each message size is 1KB--record-size 1024The maximum throughput is 10000 pieces per second--throughput 100

sh bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100000 –producer-props bootstrap.servers=localhost:9092 –record-size 1024

You can go throughLogIKMCheck to see if the partition has increased the corresponding data size



fromLogIKMYou can see that 1024 messages were sent; And the total data amount =1M; 1024 *1024byte = 1M;

2. Use the specified message file--payload-file Sending 100 messages with maximum throughput of 100 messages per second--throughput 100

  1. Configure the good news file firstbatchmessage.txt

  2. Then the message will be randomly selected from BatchMessage.txt. Note that we don’t use the — Payload-Delimeter parameter to specify the separator. The default separator is \n newline;

    bin/kafka-producer-perf-test.sh –topic test_create_topic4 –num-records 100 –throughput 100 –producer-props bootstrap.servers=localhost:9090 –payload-file config/batchmessage.txt

  3. Validate the message so that you can view the sent message through LogiKM


Related optional parameters

parameter describe example
--topic Specify the topic to consume
--num-records How many messages are sent
--throughput Maximum message throughput per second
--producer-props Producer configuration, k1=v1,k2=v2 --producer-props bootstrap.servers= localhost:9092,client.id=test_client
--producer.config Producer profile --producer.config config/producer.propeties
--print-metrics Print monitoring information at the end of test, default false --print-metrics true
--transactional-id The default value is performance-producer-default-transactional ID, which is used to test the performance of a concurrent transaction. This is only valid if the transaction is — transaction-during-ms > 0. The default value is performance-producer-default-transactional ID
--transaction-duration-ms Specifies the maximum duration of a transaction before a commitTransaction call is made to commit the transaction. Transactions are started only if a value of > 0 is specified. The default value is 0
--record-size The size of a message byte; You must specify one of the –payload-file and the other, but not both
--payload-file Specifies the source file of the message. Only text files encoded in UTF-8 are supported. The message delimiter of the file is passed--payload-delimeter Specify, which by default is split with newline \nl, and –record-size must specify one of the two, but not both; If the message is provided
--payload-delimeter If through--payload-fileIf the message content is retrieved from a file, this parameter specifies the message delimiter for the file. The default value is \n, meaning that each line of the file is considered a message. If not specified --payload-file This parameter is not in effect; When sending a message, select the message to send in a random file;

10. Kafka-consumer-perf-test.sh

Consume 100 messages --messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 –bootstrap-server localhost:9090 –messages 100


Related optional parameters

parameter describe example
--bootstrap-server
--consumer.config Consumer profile
--date-format The results are printed out in time format Default: yyyy-mm-dd HH: MM :ss:SSS
--fetch-size Gets the size of the data on a single request The default is 1048576
--topic Specify the topic to consume
--from-latest
--group Consumer group ID
--hide-header If set, the header information is not printed
--messages The amount that needs to be consumed
--num-fetch-threads The number of threads of feth data Default: 1.
--print-metrics Print the monitoring data at the end
--show-detailed-stats
--threads Number of consuming threads; The default 10

11. Delete the message kafka-delete-records.sh from the specified partition

Delete a message from a partition of the specified topic until offset is 1024

Json file offset-json-file.json

{"partitions":
[{"topic": "test1", "partition": 0,
  "offset": 1024}],
  "version":1
}

Executing a command

Sh bin/kafka-delete-records. Sh –bootstrap-server 172.23.250.249:9090 –offset-json-file config/offset-json-file

Verify that the message sent is viewed through LogiKM



You can see it here, configuration"offset": 1024Delete the message from the beginning to the offset of 1024; It’s deleted from the front

12. Check the Broker disk information

Query for the specified topic disk information --topic-list topic1,topic2

sh bin/kafka-log-dirs.sh –bootstrap-server xxxx:9090 –describe –topic-list test2

Query the specified Broker disk information--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh –bootstrap-server xxxxx:9090 –describe –topic-list test2 –broker-list 0

For example, I found information about a Topic with 3 partitions and 3 replicas in the log.dir Broker

{
    "version": 1,
    "brokers": [{
        "broker": 0,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
            "error": null,
            "partitions": [{
                "partition": "test2-1",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-0",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-2",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }]
        }]
    }, {
        "broker": 1,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
            "error": null,
            "partitions": [{
                "partition": "test2-1",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-0",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-2",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }]
        }]
    }, {
        "broker": 2,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
            "error": null,
            "partitions": [{
                "partition": "test2-1",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-0",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }, {
                "partition": "test2-2",
                "size": 0,
                "offsetLag": 0,
                "isFuture": false
            }]
        }]
    }, {
        "broker": 3,
        "logDirs": [{
            "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
            "error": null,
            "partitions": []
        }]
    }]
}

If you find it difficult to query disk information by command, you can also useLogIKMTo view

12. Consumer Groups manage kafka-consumer-groups.sh

1. Look at the customer list--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list


Call MetaDataRequest to get a list of all online brokers and then send a ListGroupsRequest request to each Broker for consumer group data

2. View consumer group details--describe

DescribeGroupsRequest

View Consumer Group details--group--all-groups

–bootstrap-server XXXXX :9090 — Describe –group test2_consumer_group –bootstrap-server XXXXX :9090 — Describe –group test2_consumer_group –bootstrap-server XXXXX :9090 — Describe –group test2_consumer_group


View all consumer group details--all-groups

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups

View all topics consumed by this consumer group, their partitions, latest consumption offset, Log latest data offset, Lag unconsumed quantity, consumer ID and other information

Query consumer member information--members

All consumer group member information


sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090


Specify consumer group member information


sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090


Query consumer status information--state

All consumption group status information


sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090


Specifies consumer group status information


sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090


3. Delete the consumer group--delete

DeleteGroupsRequest

Delete the consumer group –delete

Deletes the specified consumer group--group


sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090


Delete all consumer groups--all-groups


sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: If you want to delete a consumer group, you can only delete it successfully if all the clients of the consumer group have stopped consumption/are not online. Otherwise the following exception will be reported

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. Reset the offset of the consumption group--reset-offsets

< / font > < / font > < / font > < / font > < / font > < / p > < p style = “>”; </font>

The following example takes the following parameters: –dry-run; This parameter indicates pre-execution and prints out the results to be processed. When you want to actually execute it, change the parameter to “excute”;

The following examples of reset modes are all –to — earliest;

Please refer to the following reset Offset mode to change to another mode as needed;

Resets the offset for the specified consumption group--group

Resets offsets for all topics of the specified consumer group--all-topic


sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic


Resets the offset of the specified Topic for the specified consumer group--topic


sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

Reset offsets for all consumer groups--all-group

Reset offsets for all topics for all consumer groups--all-topic


sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic


Resets the offset of the specified Topic in all consumption groups--topic


sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

— The mode that needs to be reset after reset-offsets

Relate to the reset Offset mode

parameter describe example
--to-earliest : Reset the offset to the original offset(find the original offset that has not been deleted)
--to-current: Reset offset directly to the current offset, which is LOE
--to-latest: Reset to last offset
--to-datetime: Reset offset to the specified time; Format for:YYYY-MM-DDTHH:mm:SS.sss; - to - a datetime "2021-6-26 T00:00:00. 000"
--to-offset Reset to the specified offset, but in general, match to more than one partition, in this case, all matches are reset to this value; If 1. Target maximum offset<--to-offset, reset to the target maximum offset at this time; 2. Target minimum offset>--to-offset, reset to minimum; 3. Otherwise it will reset to--to-offsetThe target value of;You don’t usually use this --to-offset 3465
--shift-by How many offsets to add or subtract according to the offset; Positive means forward increase; Negative retreating; And, of course, it matches all of them; --shift-by 100--shift-by -100
--from-file Reset according to CVS documentation; I’m going to do it separately here

--from-fileSo let me just highlight

Some of the other modes above reset all partitions that are matched; It is not possible to reset each partition to a different offset. However,
--from-fileIt allows us to be a little more flexible;

  1. First configure the CVS document format to Topic: partition number: reset target offset

    Test2, 0100 test2, 1200 test2, 2300
  2. Execute the command

    sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. Delete offsetdelete-offsets

< / font > < / font > < / font > < / font > < / font > < / p > < p style = “>”; </font>

After the offset is removed, the next time the Consumer Group starts, it will start from scratch.

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2


Related optional parameters

parameter describe example
--bootstrap-server Specifies the Kafka service to which to connect; –bootstrap-server localhost:9092
--list List all consumer group names --list
--describe Query consumer description information --describe
--group Specified Consumption Group
--all-groups Specify all consumer groups
--members Query the member information of a consuming group
--state Query the status information of the consumer
--offsets This parameter lists the message’s offset information when querying the consumption group description information. By default it will have this parameter;
dry-run Using this parameter allows you to see the reset in advance when the offset is reset, before the actual execution is performed--excute; The default isdry-run
--excute The actual operation to reset the offset;
--to-earliest Reset offset to the earliest
to-latest Reset offset to the nearest

The attachment

Some optional configurations of ConfigCommand


Topic related optional configuration

key value The sample
cleanup.policy Clean up the strategy
compression.type Compression type (usually recommended on the produce side)
delete.retention.ms The retention time of the compressed log
file.delete.delay.ms
flush.messages Persistence Message restrictions
flush.ms Persistence frequency
follower.replication.throttled.replicas Flowwer copy flow limit format: section number: copy follower number; section number: copy follower number 1-0, 1:1
index.interval.bytes
leader.replication.throttled.replicas Partition number: replica leader number 0-0 draw
max.compaction.lag.ms
max.message.bytes The maximum Batch message size
message.downconversion.enable Message is backwards compatible
message.format.version Message format version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas The smallest ISR
preallocate
retention.bytes Log retention size (usually time bound)
retention.ms Log retention time
segment.bytes Segment size limits
segment.index.bytes
segment.jitter.ms
segment.ms Segment cutting time
unclean.leader.election.enable Whether to allow asynchronous replica selection

Broker related optional configuration

key value The sample
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable

The Users related optional configuration

key value The sample
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate Limit the flow for the consumer user
producer_byte_rate Current limiting for producers
request_percentage Percentage of requests

Clients related optional configurations

key value The sample
consumer_byte_rate
producer_byte_rate
request_percentage

Most of the operations above can be visualized on the platform using Logi-Kafka-Manager;