General documentation: Article directory Github: github.com/black-ant

A. The preface

After a brief analysis of RabbitMQ, this article takes a look at Kafka.

2. Basic use

Kafka is a very common high-performance message queue. Let’s take a look at the basics

Maven Core Dependencies

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
Copy the code

Configuration information

spring.kafka.bootstrap-servers=127.0.0.1:9092
Specifies the default consumer group ID
spring.kafka.consumer.group-id=ant
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# Configure the codec mode for the key and value of the consumer message -consumer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.listener.missing-topics-fatal=false
Copy the code

Production of the message

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void produce(a) {
        kafkaTemplate.send("start"."one"."are you ok?" + "--" + i);
    }
Copy the code

News consumption

@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord
        record) {
    logger.info("------> this is in listerner 0:{}<-------", record.value());
}
Copy the code

conclusion

  • KafkaTemplate publishes the message
  • KafkaListener listens for messages

Overall, this is consistent with the previous use of Rabbit. The internal use will be looked at in more detail later in the source code

3. Basics

Base members refer to the message queue documentation.

4. Tools source list

We will analyze two classes, KafkaTemplate and KafkaListener.

4.1 KafkaTemplate

Starting point of sending

As you can see, the return result is obtained through the Future

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    final Producer<K, V> producer = getTheProducer();
    final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    producer.send(producerRecord, buildCallback(producerRecord, producer, future));
    if (this.autoFlush) {
        flush();
    }
    return future;
}

Copy the code

Message creation

Main logic or through org. Apache. Kafka. Clients. Producer. The producer object to complete to send, this is a kafka native package, belongs to the kafka – client.

// Step 1: build the messageC0- KafkaProducer M_01- send(ProducerRecord<K, V> record, -- ProducerInterceptors -- > PS:M_01_01? - note that add additional functionality here mainly through interceptors, goes inside a For loop | - For Xu Na lake List < ProducerInterceptor < K, V>> - Call the corresponding interceptor.onsend (interceptRecord), instead of ProducerInterceptors (ProducerInterceptors) - if the collection is empty, M_02 M_02- doSend1- Determine whether the Sender object exists and is running, otherwise raise an exception -> PS:M_02_022- Call waitOnMetadata to determine whether the cluster metadata is available3- Create Cluster objects4- Build serializedKey and serializedValue5- create partition -> PS:M_02_036- Generate the final send object TopicPartition7- Build the Header and set the readOnly property8- Determine the message size9- Build callback interceptor -> PS:M_02_0510- send a message, get a RecordAccumulator RecordAppendResult for asynchronous getting results? - Contains a Future object? - RecordAccumulator Append Indicates that the record is sent/ / M_02 code
 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // First, make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                // Internal judgment logic:
                // Build the metadata and initiate metadata.fetch() to determine the timeout
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw newKafkaException.... ;throw e;
            }
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw. ; }byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw. ; }int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            // The producer callback ensures that the 'callback' and the interception callback are called
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
            // Transaction management
            if(transactionManager ! =null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

            // Send a message and return a RecordAppendResult object
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (Exception e) {
             / /... Exception handling is omitted
             // ApiException / InterruptedException / BufferExhaustedException / KafkaException / Exception
             // this.errors.record();
             // this.interceptors.onSendError(record, tp, e);}}// Step 2: Sending the message -- see below

Copy the code

PS:M_01_01 ProducerRecord object

Function: Each ProducerRecord is a message. This object contains topic,partition, and headers logic for sending mappings

public class ProducerRecord<K.V> {

    / / the Topic type
    private final String topic;
    
    // If a valid partition number is specified, that partition will be used when sending records
    // If there is no partition, but a key is specified, the hash value of that key is used to select the partition
    // If neither the key nor the partition is available, the partition is allocated in a circular manner
    private final Integer partition;
    private final Headers headers;
    
    // Id of the listener
    private final K key;
    // The body of the message sent
    private final V value;
    
    // If the user does not provide a timestamp, the producer will record the current time.
    // The final timestamp used by Kafka depends on the type of timestamp configured for the Topic.
    private final Long timestamp;
    / /...
}
Copy the code

PS: M_02_02 Sender object

Core send object, used for messages, used to deal with the cluster’s package: org. Apache. Kafka. Clients. Producer. The internals

TODO

Copy the code

PS:M_02_03 generates the partition core logic

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        returnpartition ! =null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }


Copy the code

PS: M_02_03 InterceptorCallback role

Sending a message

Generally speaking, there are three lines for sending messages:

  • Line 1: the project started, KafkaMessageListenerContainer run started to cycle
    • Call pollSelectionKeys to complete the channel write process
  • Line 2: Set SEND
    • Sender # runOnce sendProducerData(currentTimeMs) set ClientRequest
  • Line 3: Send is obtained
    • Sender # runOnce Client. poll(pollTimeout, currentTimeMs

// PS: The message is generated and sent asynchronously, and the message is sent mainly by the Sender

public class Sender implements Runnable{... }// Step 1: Thread runC2-sender M2_01 -runonce () -client.poll (pollTimeout, currentTimeMs) send M2_02 -sendProducerequest: Core sending method - builds a RequestCompletionHandler. There is an onComplete for subsequent callbacks - build a ClientRequest - call ClientRequest. Send send messages? -NetworkClient.doSend -> PS:M2_02_01// PS:M2_02_01
    public void send(Send send) {
        String connectionId = send.destination();
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        if (closingChannels.containsKey(connectionId)) {
            // Make sure the notification is triggered by 'disconnect', leaving the channel in the state off
            this.failedSends.add(connectionId);
        } else {
            try {
                // KafkaChannel is used for processing
                channel.setSend(send);
            } catch (Exception e) {
                // Update the status for consistency, the channel will be discarded after closing
                channel.state(ChannelState.FAILED_SEND);
                // Make sure to notify via 'disconnected' when 'FailedSent' is processed in the next poll
                this.failedSends.add(connectionId);
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if(! (einstanceof CancelledKeyException)) {
                    throwe; }}}}// Hide the Selector method. Send is set in M2_02 above and is processed here
C- Selector
    M- pollSelectionKeys
        -  send = channel.write()
       

Copy the code

Message callback operation

The callback is initiated based on Sender # handleProduceResponse, which is set in the sendProduceRequest method


// Start: return processing of the request
C- Sender # handleProduceResponse
// Core code overview
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
    TopicPartition tp = entry.getKey();
    ProduceResponse.PartitionResponse partResp = entry.getValue();
    ProducerBatch batch = batches.get(tp);
    completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());

// End: template callback
// Set this in kafkatemplate.dosend
C-KafkaTemplate
    M- doSend 设置
        - producer.send(producerRecord, buildCallback(producerRecord, producer, future));
    M- buildCallback
        - 

// PS: request flow to Callback
C- Sender # handleProduceResponse
C- Sender # completeBatch
C- ProduceBatch # done
C- ProduceBatch # completeFutureAndFireCallbacks
C- InterceptorCallback # onCompletion 
// The End: template callback is called here

Copy the code

The management of the Transaction

TransactionSynchronizationManager

4.2 KafkaListener

Kafka at the heart of the Listener is KafkaListenerAnnotationBeanPostProcessor, Initialize the approach is based on the BeanPostProcessor class postProcessAfterInitialization to complete

There are several routes to the Kafka consumption process:

  • The container’s registration
  • Container loop listener
  • Consumption of information

The container is registered

C3- KafkaListenerAnnotationBeanPostProcessor M3_01- postProcessAfterInitialization ? - here comes from the Bean applyBeanPostProcessorsAfterInitialization method of load, Processing Bean front extension - access to the Class level KafkaListener - through MethodIntrospector reflection for each method KafkaListener - note, here is a collection of the set, Means that he allows more than one annotation on a method@KafkaListener-> PS:M3_01_01 - processKafkaListener - processKafkaListener - determine whether it is a proxy method - create one MethodKafkaListenerEndpoint nodes, and set the current method to MethodKafkaListenerEndpoint node - for the node set related properties? - Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions ? - switchable viewer/TopicPattern/ClientIdPrefix - call KafkaListenerEndpointRegistrar M3_03 - register the current nodeMethod checkProxy(Method methodArg, Object bean)- access to current method - returns the agents by the agent of AOP interface - from the agent in the interface to obtain the corresponding method M3_04 processListener - continue to build MethodKafkaListenerEndpoint? - Bean / MessageHandlerMethodFactory / Id / GroupId / TopicPartitions / Topics /TopicPattern / ClientIdPrefix ? - according to the condition handling concurrency/autoStartup/Group / - through the BeanFactory build KafkaListenerContainerFactory - for MethodKafkaListenerEndpoint set properties? - the beanFactory/ErrorHandler - call the registrar. The current endpoint registerEndpoint registration? - the object will be consumed in the afterSingletonsInstantiated M3_05 afterSingletonsInstantiated - for the registrar to register various attributes? RegisterAllEndpoints is the registrar name in the registrar name and the registrar name is in the registrar name Return to a KafkaListenerContainerFactory resolveContainerFactory -// M3_01 core code
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            // Obtain a class-level KafkaListenerClass<? > targetClass = AopUtils.getTargetClass(bean); Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<>();
            // Get the method level annotation
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                        Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                        return(! listenerMethods.isEmpty() ? listenerMethods :null);
                    });
            if(hasClassLevelListeners) { Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) ! =null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        // Core processing logicprocessKafkaListener(listener, method, bean, beanName); }}}if(hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); }}return bean;
    }
         
         
// M3_03 core pseudocode:
private Method checkProxy(Method methodArg, Object bean) {
	Method method = methodArg;
	if(AopUtils.isJdkDynamicProxy(bean)) { method = bean.getClass().getMethod(method.getName(), method.getParameterTypes()); Class<? >[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class<? iface : proxiedInterfaces) {
			method = iface.getMethod(method.getName(), method.getParameterTypes());
			break; }}return method;
}

// Manually inject the Listener
// Accidentally found a KafkaListenerConfigurer interface in the code that can be used to manually inject the ListenerI - KafkaListenerConfigurer M - configureKafkaListeners (KafkaListenerEndpointRegistrar registrar) - built a MessageListenerContainer/ / M4_01 trigger process: the afterPropertiesSet, again upwards for DefaultListableBeanFactory # afterSingletonsInstantiatedC4- KafkaListenerEndpointRegistrar F4_01- List<KafkaListenerEndpointDescriptor> endpointDescriptors F4_02- Map<String, 's MessageListenerContainer > listenerContainers M4_01 registerEndpoint - build a KafkaListenerEndpointDescriptor - If startImmediately is initialized and started,synchronizedRegistered locked into the container - or to add to the List < KafkaListenerEndpointDescriptor > collection - > M4_02 M4_02 registerAllEndpoints -synchronizedM4_04 M4_03 -resolvecontainerFactory: Returns a KafkaListenerContainerFactory M4_04 registerListenerContainer - build a's MessageListenerContainer, and join the Map < String, MessageListenerContainer> ? - createListenerContainer registers Method - if Group is included, join List<MessageListenerContainer> collection? - This collection will be called in the start(inherited from Lifecycle) method -> M4_05 M4_05- startIfNecessary - call MessageListenerContainer start method M4_06- CreateListenerContainer -m4_07 -start FOR- getListenerContainers Get all messagelistenerContainers, Call startIfNecessary in turn? - M4_05 C05 -'s MessageListenerContainer M5_01 - start () - C06 - AbstractMessageListenerContainer M6_01 - doStart () - the final call KafkaMessageListenerContainer M6_02 - the run () - here by a While loop processing - > M6_03 - pollAndInvoke () - invokeListener (records) Reflective processing C07 - KafkaMessageListenerContainer M7_01 - doStart () - prepare ContainerProperties - check AckMode mode - build GenericMessageListener - Build ListenerConsumer - Modify container state? - This is associated with the next loop listener - this is also using a CountDownLatch to wait for execution - start the thread execution, ListenConsumer -> PS:M7_01_02 M7_02- doInvokeOnMessage The final call is M8_1 M7_03- onMessage() -/ / M7_01 code
protected void doStart(a) {
    if (isRunning()) {
        return;
    }
    if (this.clientIdSuffix == null) { // stand-alone container
        checkTopics();
    }
    Prepare ContainerProperties / /
    ContainerProperties containerProperties = getContainerProperties();
    checkAckMode(containerProperties);

    Object messageListener = containerProperties.getMessageListener();
    if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } GenericMessageListener<? > listener = (GenericMessageListener<? >) messageListener; ListenerType listenerType = determineListenerType(listener);this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    // Modify the container state
    setRunning(true);
    this.startLatch = new CountDownLatch(1);
    // Start the field execution here
    this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
    try {
        if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) { publishConsumerFailedToStart(); }}catch (@SuppressWarnings(UNUSED)InterruptedException e) { Thread.currentThread().interrupt(); C8}} - ConcurrentMessageListenerContainer M8_01 - doStart () - check the topic - start all KafkaMessageListenerContainer C10 - MethodKafkaListenerEndpoint M10_01 - createMessageListener - a messageListener. SetHandlerMethod registration MethodCopy the code

PS:M3_01_01 multiple KafkaListener annotations

kafkaTemplate.send("start"."one"."are you ok one?" + "--");
kafkaTemplate.send("topic1"."two"."are you ok two?" + "--");


@KafkaListener(id = "one", topics = "start", clientIdPrefix = "myClientId")
@KafkaListener(id = "two", topics = "start", clientIdPrefix = "myClientId")
public void listener0(ConsumerRecord
        record) {
    logger.info("------> this is in listerner 0:{}<-------", record.value());
}

/ / the result
2021-05-04 22:04:35.094  INFO 20012 --- [      two-0-C-1] c.g.k.demo.service.KafkaConsumerService  : ------> this is in listerner 0:are you ok two? ----<-------2021-05-04 22:04:35.098  INFO 20012 --- [      one-0-C-1] c.g.k.demo.service.KafkaConsumerService  : ------> this is in listerner 0:are you ok one? ----<-------Copy the code

PS:M7_01_02 Execution thread

Container loop listener

The main logic in ListenerConsumer KafkaMessageListenerContainer internal classes

  • C- ConsumerListener # run
  • C- KafkaMessageListenerContainer # pollAndInvoke
  • C- KafkaMessageListenerContainer # invokeOnMessage
  • C- KafkaMessageListenerContainer # doInvokeOnMessage
    • The class call the corresponding MessagingMessageListenerAdapter final execution

// Step 1: Starting point
C- KafkaMessageListenerContainer.ListenerConsumer
    public void run(a) {
        publishConsumerStartingEvent();
        this.consumerThread = Thread.currentThread();
        if (this.consumerSeekAwareListener ! =null) {
            this.consumerSeekAwareListener.registerSeekCallback(this);
        }
        KafkaUtils.setConsumerGroupId(this.consumerGroupId);
        this.count = 0;
        this.last = System.currentTimeMillis();
        initAssignedPartitions();
        publishConsumerStartedEvent();
        while (isRunning()) {
            try {
                // Execute the invoke logic
                pollAndInvoke();
            }catch (@SuppressWarnings(UNUSED) WakeupException e) {
                / /... Omit exception handling
            }
        }
        wrapUp();
    }
    
// Step 2: The poll object in the loopC - ListenerConsumer M - pollAndInvoke () : core method, poll to obtain and mapped to relevant methods - ConsumerRecords < K, V > records = doPoll | - ()this.consumer.poll(this.pollTimeout)
        - invokeListener(records)
    M- doPoll
        
    
// Step 3: In doInvokeWithRecords, messages are iterated over
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
    doInvokeRecordListener(record, null, iterator)
}


// Step 4: In the doInvokeOnMessage, call the corresponding ListenerAdapter to complete message processing
/ / PS: this in KafkaMessageListenerContainer generated by the constructor
C- KafkaMessageListenerContainer.ListenerConsumer

// Select the listener to process through the four listener types
// ACKNOWLEDGING_CONSUMER_AWARE,CONSUMER_AWARE,ACKNOWLEDGING,SIMPLE

Copy the code

Final execution processing

Will eventually get from MessagingMessageListenerAdapter Handler of execution, the Handler in KafkaListenerEndpointRegistrar is injected

 // Finally execute the classC08 - RecordMessagingMessageListenerAdapter M8_1 onMessage - to obtain the corresponding Method. Reflection call? - the HandlerAdapter in C10 - RecordMessagingMessageListenerAdapter MethodKafkaListenerEndpoint # createMessageListener is processing M10_1- onMessageCopy the code

Five. Kafka points in depth

TODO: The next article will look at how Kafka implements its functionality, node by node

FAQ

Java nio. File. FileSystemException – > another program is using this file, the process cannot access

Problems in details: D: \ TMP \ kafka – logs \ topic_1-0\00000000000000000000. Timeindex. Does: another program is using this file, process cannot access the solution: Manually delete the log files in \kafka-logs and restart Kafka

conclusion

This article mainly covers two parts, sending and listening. Sending is mainly based on client. Poll, listening core is mainly scanning and while loop Consumer poll pull.

Once you understand this part of the process, you can then start looking in detail at how it works and can be customized for more functions such as clustering