Kafka client classification and MVN coordinates

1.1 Classification of Kafka clients

  1. AdminClient: An administrative client that manages and monitors topics, brokers, etc

  2. Producer: producers publish messages to specific topics

  3. Consumer: a Consumer subscribes to a message

  4. Stream API: Efficiently converts input streams to output streams

  5. Connector API: Pull data from some source system or application into Kafka

1.2 MVN coordinates

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
Copy the code

2 AdminClient learning

2.1 How do I Create AdminClient

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.junit.Test;
import java.util.Properties;

public class CreateAdminClientTest {

    @Test
    public void test(a) {
        Properties prop = new Properties();
         // Set kafka's address. If multiple addresses are used, separate them with commas
        prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); AdminClient adminClient = AdminClient.create(prop); }}Copy the code

AdminClientConfig is a constant class that provides kafka with the name of a configuration parameter. You can refer to this class for details about how to configure client parameters.

BOOTSTRAP_SERVERS_CONFIG, for example, is the address of the configured Kafka

Servers = [localhost:9092] ## Client.dns.lookup = default client.id = connections.max.idle.ms = 300000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 120000 retries = 5 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 Sasl. Kerberos. Service. The name = null sasl. Kerberos. Ticket. Renew. The jitter = 0.05 sasl. Kerberos. Ticket. Renew. Window. The factor = . 0.8 sasl. Login. The callback handler. The class = null sasl. Login. The class = null sasl. Login. Refresh. Buffer. Seconds = 300 Sasl. Login. Refresh. Min. Period. The seconds = 60 sasl. Login. Refresh. Window. The factor = 0.8 sasl. Login. Refresh. Window. The jitter = 0.05 SASl.mechanism = GSSAPI security.protocol = PLAINTEXT security. Providers = null sess.buffer. Bytes = 131072 Ssl.cipher. suites = null ssl.enabled. Protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null  ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKSCopy the code

2.2 create a topic

package study.wyy.kafka.java.admin;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before;
import org.junit.Test;

import javax.lang.model.element.VariableElement;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;

public class TopicTest {

    private AdminClient client;

    @Before
    public void createClient(a) {
        Properties prop = new Properties();
        prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        client = AdminClient.create(prop);
    }

    @Test
    public void testCreat(a) {
        short rs = 1;
        /** * Parameter 1: topic name * parameter 2: number of partitions * parameter 3: number of replicas */
        NewTopic newTopic = new NewTopic("java-api-study".1, rs);
        // createTopics receives a collection of newtopics, so it is possible to create multiple topics at once
        CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
         // Avoid disconnecting the client too soon and causing the Topic to fail to be created
        Thread.sleep(500);
        // Get the number of partitions set for topic
        System.out.println(result.numPartitions("java-api-study").get()); }}Copy the code

2.3 the query topic

  1. Unconditional query
@Test
public void testList(a) throws ExecutionException, InterruptedException {
    ListTopicsResult listTopicsResult = client.listTopics();
    // Get the name
    KafkaFuture<Set<String>> names = listTopicsResult.names();
    names.get().forEach(System.out::println);

    System.out.println("= = = = = = = = = = = = = = = = = = = = = = = =");
    KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
    listings.get().forEach(System.out::println);
}
Copy the code

Output:

java-api-study
========================
(name=java-api-study, internal=false)
Copy the code
  1. Conditions of the query
@Test
public void testList2(a) throws ExecutionException, InterruptedException {
    ListTopicsOptions options = new ListTopicsOptions();
    // Query the built-in topic
    options.listInternal(true);
    ListTopicsResult listTopicsResult = client.listTopics(options);
    // Get the name
    KafkaFuture<Set<String>> names = listTopicsResult.names();
    names.get().forEach(System.out::println);

    System.out.println("= = = = = = = = = = = = = = = = = = = = = = = =");
    KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
    listings.get().forEach(System.out::println);
}
Copy the code

Output:

java-api-study
__consumer_offsets
========================
(name=java-api-study, internal=false)
(name=__consumer_offsets, internal=true)
Copy the code

__consumer_offsets: A topic built into Kafka that records the offset of consumption messages

2.4 remove the topic

@Test
public void testDel(a) throws ExecutionException, InterruptedException {
    Collection<String> topics = Arrays.asList("java-api-study");
    DeleteTopicsResult deleteTopicsResult = client.deleteTopics(topics);
}
Copy the code

2.5 Querying Topic Messages

@Test
public void testDescribeTopics(a) throws ExecutionException, InterruptedException {
    Collection<String> topics = Arrays.asList("java-api-study");
    DescribeTopicsResult describeTopicsResult = client.describeTopics(topics);
    Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
    descriptionMap.forEach((k, v) -> {
        System.out.println("topicName: " + k);
        System.out.println("topicDesc: " + v);
    });
}
Copy the code
topicName: java-api-study
topicDesc: (name=java-api-study, 
internal=false, 
partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=[])
Copy the code