Kafka client classification and MVN coordinates
1.1 Classification of Kafka clients
-
AdminClient: An administrative client that manages and monitors topics, brokers, etc
-
Producer: producers publish messages to specific topics
-
Consumer: a Consumer subscribes to a message
-
Stream API: Efficiently converts input streams to output streams
-
Connector API: Pull data from some source system or application into Kafka
1.2 MVN coordinates
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
Copy the code
2 AdminClient learning
2.1 How do I Create AdminClient
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.junit.Test;
import java.util.Properties;
public class CreateAdminClientTest {
@Test
public void test(a) {
Properties prop = new Properties();
// Set kafka's address. If multiple addresses are used, separate them with commas
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); AdminClient adminClient = AdminClient.create(prop); }}Copy the code
AdminClientConfig is a constant class that provides kafka with the name of a configuration parameter. You can refer to this class for details about how to configure client parameters.
BOOTSTRAP_SERVERS_CONFIG, for example, is the address of the configured Kafka
Servers = [localhost:9092] ## Client.dns.lookup = default client.id = connections.max.idle.ms = 300000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 120000 retries = 5 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 Sasl. Kerberos. Service. The name = null sasl. Kerberos. Ticket. Renew. The jitter = 0.05 sasl. Kerberos. Ticket. Renew. Window. The factor = . 0.8 sasl. Login. The callback handler. The class = null sasl. Login. The class = null sasl. Login. Refresh. Buffer. Seconds = 300 Sasl. Login. Refresh. Min. Period. The seconds = 60 sasl. Login. Refresh. Window. The factor = 0.8 sasl. Login. Refresh. Window. The jitter = 0.05 SASl.mechanism = GSSAPI security.protocol = PLAINTEXT security. Providers = null sess.buffer. Bytes = 131072 Ssl.cipher. suites = null ssl.enabled. Protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKSCopy the code
2.2 create a topic
package study.wyy.kafka.java.admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before;
import org.junit.Test;
import javax.lang.model.element.VariableElement;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
public class TopicTest {
private AdminClient client;
@Before
public void createClient(a) {
Properties prop = new Properties();
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
client = AdminClient.create(prop);
}
@Test
public void testCreat(a) {
short rs = 1;
/** * Parameter 1: topic name * parameter 2: number of partitions * parameter 3: number of replicas */
NewTopic newTopic = new NewTopic("java-api-study".1, rs);
// createTopics receives a collection of newtopics, so it is possible to create multiple topics at once
CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
// Avoid disconnecting the client too soon and causing the Topic to fail to be created
Thread.sleep(500);
// Get the number of partitions set for topic
System.out.println(result.numPartitions("java-api-study").get()); }}Copy the code
2.3 the query topic
- Unconditional query
@Test
public void testList(a) throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = client.listTopics();
// Get the name
KafkaFuture<Set<String>> names = listTopicsResult.names();
names.get().forEach(System.out::println);
System.out.println("= = = = = = = = = = = = = = = = = = = = = = = =");
KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
listings.get().forEach(System.out::println);
}
Copy the code
Output:
java-api-study
========================
(name=java-api-study, internal=false)
Copy the code
- Conditions of the query
@Test
public void testList2(a) throws ExecutionException, InterruptedException {
ListTopicsOptions options = new ListTopicsOptions();
// Query the built-in topic
options.listInternal(true);
ListTopicsResult listTopicsResult = client.listTopics(options);
// Get the name
KafkaFuture<Set<String>> names = listTopicsResult.names();
names.get().forEach(System.out::println);
System.out.println("= = = = = = = = = = = = = = = = = = = = = = = =");
KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
listings.get().forEach(System.out::println);
}
Copy the code
Output:
java-api-study
__consumer_offsets
========================
(name=java-api-study, internal=false)
(name=__consumer_offsets, internal=true)
Copy the code
__consumer_offsets: A topic built into Kafka that records the offset of consumption messages
2.4 remove the topic
@Test
public void testDel(a) throws ExecutionException, InterruptedException {
Collection<String> topics = Arrays.asList("java-api-study");
DeleteTopicsResult deleteTopicsResult = client.deleteTopics(topics);
}
Copy the code
2.5 Querying Topic Messages
@Test
public void testDescribeTopics(a) throws ExecutionException, InterruptedException {
Collection<String> topics = Arrays.asList("java-api-study");
DescribeTopicsResult describeTopicsResult = client.describeTopics(topics);
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
descriptionMap.forEach((k, v) -> {
System.out.println("topicName: " + k);
System.out.println("topicDesc: " + v);
});
}
Copy the code
topicName: java-api-study
topicDesc: (name=java-api-study,
internal=false,
partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=[])
Copy the code