1 Overall process
(1) Custom message blockers, generally useless. (2) Synchronous wait, 'pull metadata'. The first time you start a topic, you need to pull metadata, which is lazy to load ideas. Pull the cluster information. A cluster contains information about a cluster topic-broker-partition. (3) Serialize topics, keys, and values into byte[] arrays. (4) Determine which partition to send according to key and value calculations by the Partitioner. (5) Determine the message size, which cannot be larger than the single-request limit size and buffer size. (6) Bind message callback function and interceptor callback function (7) 'send message to Accumulator' (8) welcome sender thread. If batchisfull means a batchisfull, or there's a new batch, both means there's a batch to send, it wakes up the sender thread.Copy the code
2 Obtaining metadata
When sending messages, the producer only knows the topic, and the metadata is not known at the first time.
Look at producer’s send method
(1) Obtain topic metadata first, the first step of the real dosend waitOnMetadata, 'block' until the metadata is obtained.Copy the code
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
(2) Record the topics to be sent in the map of metadata topics. (3) Get cluster information from metadata first. If you send this topic information for the first time, you can't get metadata. Come back later and you'll get it straight back. (4) Set the metadata's needupdate to true and record the current metadata version for future comparison. (5) Wake up the sender thread, then awaite itself blocked. awaitUpdate(final int lastVersion, final long maxWaitMs)Copy the code
Await process is simple, it is a while loop, calculate the remaining time according to the configured timeout, and then wait, either the middle sender thread wakes up, or wake up after the time, and then check whether the version has been updated. If the version has been updated, the data has not been updated. Note that, throughout the process, the producer manages a timeout, calculates the remaining time, and reports a timeout if the time is exceeded.
Producer’s send is waiting for the sender thread to wake up. Look at the sender thread:
The sender itself is also a thread that starts when kafkaProducer is started, which is a while loop that runs the run method. (2) This.client. Ready, check whether a connection has been established to the broker. If not, initiate a connection.
Check the connection: connectionStates canConnect (node idString (), now)) initiate connections: initiateConnect (node node, long) some key parameters: / / to connect a non-blocking socketChannel. ConfigureBlocking (false); // Keepalive 2 hours automatic probe, socket.setkeepalive (true); // Disable the nagle algorithm, do not send small packets, reduce the delay socket.settcpnodelay (true);Copy the code
Since establishing a connection is non-blocking, there is a place to wait for the connection to complete.
(3) because they did not, start the connection among many process can be omitted, look at the sender run directly in the end, this. Client. The poll (pollTimeout, now) – > metadataUpdater. MaybeUpdate (now); Here is a request encapsulating a pull metadata,
Typically, we just pull metadata from the topic we send, encapsulate a clientRequest, and call the dosend method to add the metadata request to the inFlightRequests queue. KafkaChannel sends only one request at a time. KafkaChannel sends only one request at a time. This component is also used on the server. Naturally, if the request is put into kafkachannel, then there must be a Java channel behind it for the next step.
An important part of doSend is to focus on the op_write event with the corresponding connect:
Poll (pollTimeout, now) -> this.selector. Poll (Utils. Min (timeout, metadataTimeout, requestTimeoutMs)); PollSelectKeys -> pollSelectKeys kafka encapsulates selectKeys to handle different scenarios.
With finneshConnect, wait for the connection to complete (since the previous connection was non-blocking, it would have to wait for the connection to complete), and select the key through the underlying TransportLayer componentCancel connect event and add op_read event
, because the op_write event is added in step 3, after the connection is completed, the write branch is also sent. Send the metadata send request that was just wrapped through the underlying cannel, and record it in completedSends, indicating that it sends successfully.
(5) Ideally, after some time, the server returns a response, so it should go to the op_read logic, read the response data into the stageReceives queue,
(6) Back to the core poll of NetworkClient, the first MaybeUpdate encapsulates the metadata request request. Poll is responsible for sending and receiving the request and response. See handlecompletedTimeline -> handleResponse -> this.metadata.update(cluster,now) for an update to the cluster. The most important one is versioin+1. Then you can go back to the awaiteUpdate in the producer send process, because after waiting for the new version over and over again, the new version can actually send the message down. The process of obtaining metadata is also a process of sending and receiving requests. This way is consistent with the process of sending messages. It is the encapsulation and multi-layer abstraction of NIO, the separation of network components and business components, and the communication through some intermediate queues.