At the end of the previous section, we concluded that when KafkaProducer is initialized, the metadata is not pulled, but the Selector component is created, the Sender thread is started, and the SELECT blocks waiting for the request response. Because no request has been sent, the metadata is not actually pulled during initialization.
The first call to send wakes up select(), which was blocked before the Selector was called, enters the second while loop, and sends the pull metadata request. Obejct. Wait for 60 seconds to pull metadata from the Broker. The actual production message request will continue, otherwise a pull metadata timeout exception will be reported.
The diagram below:
How does the second while loop send a request to pull metadata and notifyAll() to wake up when the Selector is successful?
Let’s take a look today.
Second while loop – starts the metadata pull
Wake up the blocking select, do you remember the blocking logic?
If pollSelectionKeys is greater than 0, pollSelectionKeys is 0 because it is directly wakeUp(), and pollSelectionKeys is 0. PollSelectionKeys processing will not be performed.
And when the Selector poll method returns, because the pollSelectionKeys are not executed, Therefore, handleCompletedSends, handleCompletedReceives, handleDisconnections, handleConnections, and handleTimedOutRequests were not executed . (You can try the breakpoint yourself and find out.)
The logic above completes, that is, the first loop ends and the second loop is repeated. The overall process is as shown in the figure below :(mainly the process of gray remarks)
The second maybeUpdate loop is executed
Now that we are in the second loop, the methods that will start with maybeUpdate (), poll (), and handle are re-executed.
Do you remember the core context of maybeUpdate? It determines whether or not to execute metadataTimeout based on whether or not metadataTimeout is zero at three times. The code is as follows:
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
if (metadataTimeout == 0) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
maybeUpdate(now, node);
}
return metadataTimeout;
}
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
Copy the code
The first time through the loop the metadataTimeout is non-zero, and the second time through the loop the metadataTimeout is actually zero.
RequestUpdate (); requestUpdate(); requestUpdate();
This line of code, it changes the needUpdate flag to true. The value of timeToNextMetadataUpdate in the three values that determine metadataTimeout is also set to 0, That means timeToNextMetadataUpdate, timeToNextReconnectAttempt, waitForMetadataFetch is 0, natural metadataTimeout is 0.
As shown below:
So the second loop actually executes the logic of maybeUpdae. Unlike the first time, when you didn’t do anything.
If metadataTimeout=0, two main methods are executed:
1) leastLoadedNode: Select a Broker node and pull metadata from it. The best criteria to choose are connected brokers and nodes with less data to send. We will examine this logic in detail.
2) The maybeUpdate method is actually very critical, which is mainly the logic for establishing connections or initiating pull metadata requests
So let’s take a look at the main logic of mayBeUpdate:
/** * Add a metadata request to the list of sends if we can make one */
private void maybeUpdate(long now, Node node) {
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else { // connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
this.lastNoNodeAvailableMs = now; }}Copy the code
The above context is relatively simple, mainly an if-else.
Call the doSend() method if the pull metadata request can be sent
Else If the request cannot be sent, the connection has not been established. You need to initialize the connection and call initateConnection()
The whole process is shown below:
How are NIO connections established before pulling metadata?
MaybeUpdae will use the ClusterConnectionStates component according to canSendRequest and canConnect methods to determine whether it has established a connection with the Broker. This component was mentioned in section 2 before. NetworklClient is a component that records connections to the Broker. The main code is as follows:
NetworklClient.java;
private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
ClusterConnectionStates
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
}
Copy the code
In addition to the connection state, there are other additional logical judgments, which are very detailed judgments, so we don’t have to go into them here.
The initiateConnect() method must be used to establish the connection since no connection has been established with the Broker. Let’s take a look.
/** * Initiate a connection to the given node */
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); }}public void connecting(String id, long now) {
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));
}
Copy the code
The core thread is very simple, just two sentences:
1) connectionStates. Connecting (record) state for the connection, the do not have what to say.
2) Select.connect () performs the connect method with a selector wrapped in Kafka, which is the key to establishing a connection.
The Selector’s connect method is crucial, so let’s look at what its code is doing:
org.apache.kafka.common.network.Selector.java
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if(sendBufferSize ! = Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize);if(receiveBufferSize ! = Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true);
boolean connected;
try {
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
PlaintextChannelBuilder.java
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
KafkaChannel channel = null;
try {
PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
} catch (Exception e) {
log.warn("Failed to create channel due to ", e);
throw new KafkaException(e);
}
return channel;
}
Copy the code
The core context of the connect() method is as follows:
1) Socketchannel.open () creates a NIO SocketChannel
2) Set some Sokect parameters and initiate a connection via SocketChannel (NIO HelloWorld)
3) socketChannel register with Selector, and specify the connection request selectionKey. OP_CONNECT, which is associated with socketChannel through SelectionKey
BuildChannel encapsulates the SocketChannel, Selector, SelectionKey relationships in KafkaChannel. It also encapsulates an object called TransportLayer. And by key.attach(channel); Bind KafkaChannel to SelcetionKey
5) Cache KafkaChannel through Map<String, KafkaChannel> channels
The entire logic is shown below:
At this point the initateConnect() method completes, the maybeUpdate method returns, and then the next step in the second while loop, Selector.poll();
As shown in the pink line below:
And the Selector. The poll (); We already know that underneath it it calls select() of Nian Selector and blocks to see if there’s a request of interest.
If you are familiar with NIO, you will know that if the connection is successfully established, the registered Selectionkey will have the event selectionKey.op_connect of interest, and will jump out of the block.
This process is shown below:
The pollSelectionKeys() method must then be executed:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
/* if channel is not ready finish prepare */
if(channel.isConnected() && ! channel.ready()) channel.prepare();/* if channel is ready read from any connections that have readable data */
if(channel.ready() && key.isReadable() && ! hasStagedReceive(channel)) { NetworkReceive networkReceive;while((networkReceive = channel.read()) ! =null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if(send ! =null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size()); }}/* cancel any defunct sockets */
if(! key.isValid()) { close(channel);this.disconnected.add(channel.id()); }}catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id()); }}}Copy the code
The logic of this method is not very clear, it doesn’t matter, we can debug it:
You can see that this method is basically iterating through the set of SelectionKeys that are responding, and since we only registered one SelectioinKey, connection type request, we only got one.
And then you go all the way to the break point and you see that the core of the while loop executes the following sentence:
private final List<String> connected;
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
Copy the code
KafkaChannel.java
public boolean finishConnect(a) throws IOException {
return transportLayer.finishConnect();
}
PlaintextTransportLayer.java
public boolean finishConnect(a) throws IOException {
boolean connected = socketChannel.finishConnect();
if (connected)
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return connected;
}
Copy the code
The core of the if-else code is as follows:
First through the channel. FinishConnect () to judge whether to establish a connection, the underlying essence of NIO socketChannel. FinishConnect (); If the connection is established and the SelectionKey is changed, the operation that is concerned is mainly selectionKey. OP_READ, not OP_CONNECT. The ChannelId that created the connection is then cached in a List Connected collection.
The overall picture is as follows:
The poll method completes execution, the second step of the second while loop completes execution, and finally the while loop executes a bunch of handle methods:
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
Copy the code
You can actually guess, which method will be executed after the connection is established? That’s right, handleConnections() is executed, the other methods don’t execute at all, they just return.
What logic does handleConnections perform?
NetWorkClient.java
private void handleConnections(a) {
for (String node : this.selector.connected()) {
log.debug("Completed connection to node {}", node);
this.connectionStates.connected(node);
}
}
Selector.java
public List<String> connected(a) {
return this.connected;
}
ClusterConnectionStates.java
public void connected(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CONNECTED;
}
Copy the code
It actually traverses the Channel established Node (Broker) and records that the Node is in CONNECTED state. (Remember when the maybeUpdate initiateConnect() was in CONNECTING state?)
At this point, the second while loop completes, and the second loop does the same with the maybeUpdate()->poll()->handle method. The main thing to do is to establish a NIO connection with the Broker.
In the first loop, maybeUpdate()->poll()->handle, the poll() method blocked and nothing else did.
The overall process of the second cycle is summarized in the big picture below:
After the second round of logic, are you now more familiar with Producer?
Then there is a third while loop or more, all the same logic to execute the maybeUpdate()->poll()->handle method again.
Send a pull request for metadata
For the third loop for the Sender, the first step is definitely maybeUpdate(), and this time maybeUpdate(), the connection has been established, will execute another piece of logic, the doSend() method, which actually pulls the metadata. Let’s have a look!
/** * Add a metadata request to the list of sends if we can make one */
private void maybeUpdate(long now, Node node) {
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// we don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else { // connected, but can't send more OR connecting
// In either case, we just need to wait for a network event to let us know the selected
// connection might be usable again.
this.lastNoNodeAvailableMs = now;
}
Copy the code
It will be executed this time when maybeUpdate is executed
//NetworkClient.java
private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
//ClusterConnectionStates.java
public boolean isConnected(String id) {
NodeConnectionState state = nodeState.get(id);
returnstate ! =null && state.state == ConnectionState.CONNECTED;
}
//Selector.java
public boolean isChannelReady(String id) {
KafkaChannel channel = this.channels.get(id);
returnchannel ! =null && channel.ready();
}
//PlaintextTransportLayer.java
public boolean ready(a) {
return true;
}
//InFlightRequests.java
private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
Copy the code
There’s a bunch of components up there that doSend when all three conditions are true.
ConnectionStates. IsConnected (node) : it must be true, because the connection status is recorded as Connected.
Selector. IsChannelReady (node) : the previously established Kafkachannel is cached in the map, channel.ready() always returns true by default,
InFlightRequests. CanSendMore (node) : requests queue is not empty and queue element number less than maxInFlightRequestsPerConnection default 5, this configuration.
On the second loop, the queue is completely empty, so this condition is also true.
/** * InFlightRequests include maps
> Requests, maps and two-way queues. We mentioned this component earlier in our analysis of * networks, where InFlightRequests are collections of requests that have been or are being sent but have not yet received a response. I don't know exactly what to do. * However, here we can see that before sending the request, the request will enter this memory structure for temporary storage, which is very similar to the comment expression, and is often used to determine whether the request is waiting to be sent. * /
,>
Copy the code
That is, when the connection is established, the third loop executes the doSend method logic.
As shown below:
Then if passes, the following logic is executed:
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(clientRequest, now);
}
public RequestSend(String destination, RequestHeader header, Struct body) {
super(destination, serialize(header, body));
this.header = header;
this.body = body;
}
public static ByteBuffer serialize(RequestHeader header, Struct body) {
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
header.writeTo(buffer);
body.writeTo(buffer);
buffer.rewind();
return buffer;
}
Copy the code
As you can see, the request parameters are wrapped at various levels before doSend and the object is serialized into ByteBuffer. (What format is serialized into ByteBuffer? We won’t do that at this point, but we’ll come back to that later when we look at Kafka to solve sticky and unpack problems.)
MetadataRequest->RequestHeader+Struct-=RequestSend (serialize becomes ByteBuffer) ->ClientRequest
After wrapping the request, the doSend method is called:
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
//ClientRequest.java
public RequestSend request(a) {
return request;
}
// Selector.java
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination()); close(channel); }}private KafkaChannel channelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel == null)
throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());
return channel;
}
// KafkaChannel.java
public void setSend(Send send) {
if (this.send ! =null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// PlaintextTransportLayer.java
public void addInterestOps(int ops) {
key.interestOps(key.interestOps() | ops);
}
Copy the code
This method is a bit more interesting, and you’ll notice that the main thread of doSend is as follows:
1) Requests are temporarily stored in inFlightRequests memory structures
2) Selector retrieves the previously cached KafkaChannel from the map
3) KafkaChannel records the RequestSend of the sent request and adds a new focus on write requests (OP_CONNECT was removed and OP_READ was added).
The above operations are basically NIO normal operations, get a Channel, set the attention to the event. But…
What about channel.write? There’s no data written out here, right? So the KafkaChannel method is called setSend, and it just sets the object to Send, and OP_WRITE of interest.
The whole process is shown below:
After doSend method performs metadataUpdater. MaybeUpdate method is returned, then, would be the third cycle of the second step, the selector. The poll () method, finally will handle opening method of execution. I believe you are familiar with this.
And the selector. Poll () core has two steps:
1) selector. Select () blocks and waits for the server to return the event it cares about
PollSelectionKeys (); pollSelectionKeys(); pollSelectionKeys(); pollSelectionKeys();
Because the client is interested in the OP_READ and OP_WRITE events, the third time the loop is executed, the selector. Select () block breaks out, and pollSelectionKeys() is executed.
On the third while loop in selection. Java, pollSelectionKeys iterates through the core logic of SelectionKeys as follows:
// The pollSelectionKeys method iterates through the core logic of SelectionKeys on the third while loop execution of SelectionKeys
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if(send ! =null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size()); }}Copy the code
The core of the above is:
1) Send the request for pulling metadata through channel.write()!
2) After sending, record the sent request to List completedSends; In the
Here we finally see channel.write() and finally the underlying layer writes out the previously serialized ByteBuffer via niO’s Socketchannel.write. Selectionkey. OP_WRITE is removed from selectionKey. OP_WRITE and no more data is written out.
//KafkaChannel.java
public Send write(a) throws IOException {
Send result = null;
if(send ! =null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
//PlaintextTransportLayer.java
public long write(ByteBuffer[] srcs) throws IOException {
return socketChannel.write(srcs);
}
Copy the code
The data is finally sent and the whole process can be summarized as follows:
On the third execution of the while loop, the maybeUpdat() and poll() methods are executed, and finally the handle method is executed.
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
Copy the code
One sure implementation of these is the handleCompletedSends method.
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if(! request.expectResponse()) {this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false.null)); }}}Copy the code
Request. ExpectResponse () defaults to true, so the if condition is not true. HandleCompletedSends exactly nothing. It looks from the comment that this method is meant to handle: “If there is no response, return it when sending is complete.” In other words, this logic is not the key logic, so let’s just skip it.
As you get more experience reading source code, you’ll often find this non-core logic. At this point, you must learn to make trade-offs and learn to think big and small.
In this case, the method starting with Handle is actually executed. Time to enter the fourth while loop….
Receive the pulled metadata and wake up the kafkaProduer.send method
In fact, you can imagine what the fourth while loop would do. The kafkaProduer. Send method is invoked after the server returns the metadata. Having learned from the previous three while loops, let’s go straight to the core logic and see how it works. Take a quick look!
1) First execute maybeUpdate:
On the fourth while loop, waitForMetadataFetch in maybeUpdate evaluates to a non-zero value, causing maybeUpdate to execute nothing as in the first loop
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
Copy the code
PollSelectionKeys = pollSelectionKeys; pollSelectionKeys = pollSelectionKeys; pollSelectionKeys = pollSelectionKeys; pollSelectionKeys = pollSelectionKeys
/* if channel is ready read from any connections that have readable data */
if(channel.ready() && key.isReadable() && ! hasStagedReceive(channel)) { NetworkReceive networkReceive;while((networkReceive = channel.read()) ! =null) addToStagedReceives(channel, networkReceive); } Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;/** * adds a receive to staged receives */
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if(! stagedReceives.containsKey(channel)) stagedReceives.put(channel,new ArrayDeque<NetworkReceive>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
Copy the code
This logic takes ByteBuffer and puts it into NetworkReceive. The underlying logic calls SocketChannel’s read() method, which is a common NIO operation. It’s similar to what sends the data. Bottom here will not take you to see, I believe you can see it.
In addition to receiving data into a NetworkReceive object, the received data is also temporarily stored in a two-ended queue Deque. Map < KafkaChannel, Deque > stagedReceives;
After executing the poll method, it is time to execute the method starting with handle. This time handleCompletedReceives() :
/**
* Handle any completed receives and update the response list with the responses received.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if(! metadataUpdater.maybeHandleCompletedReceive(req, now, body)) responses.add(new ClientResponse(req, now, false, body)); }}public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
Copy the code
And the context of this method is simple:
1) According to the ClientRequest temporarily stored before, find the corresponding response from NetworkReceive, and then perform a series of parsing ButeBuffer into a Struct object.
2) perform DefaultMetadataUpdater maybeHandleCompletedReceive method
After DefaultMetadataUpdater maybeHandleCompletedReceive this method is doing?
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if(! errors.isEmpty()) log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
// don't update the cluster if there are no valid nodes... the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now); }}public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
notifyAll();
log.debug("Updated cluster metadata version {} to {}".this.version, this.cluster);
}
Copy the code
The core of the code is as follows:
1) after maybeHandleCompletedReceive struts object will be converted to MetadataResponse to Cluster objects
2) Execute the metadata.update() method based on the Nodes in the Cluster, if greater than 0, and perform some Listener callbacks. Finally, the key is that metadata.notifyall () wakes up kafkaproducer.send (), which blocked the wait.
The whole process is summarized as follows:
conclusion
To this metadata pull source code principle we study the end. In fact, when you’re done, you’ll notice that we executed the core while loop four times, repeating the process over and over again, as if the source code weren’t too hard.
In fact, this is the case. Most of the time, simple things are repeated. As long as you think more and think more, you will find the rules and slowly understand the essence of things. This idea is much more important than our understanding of Kafka pull metadata source principles.
The other thing is metadata pull, which is actually not complicated, it’s all about connection establishment, request sending, request response. Kafka uses some interesting mechanisms, such as wait+notifyAll and NIO.
I’ve been drawing you a detailed logic diagram, but you can draw your own schematic diagram, and summarize the logic. If you can draw a picture and explain it to someone else, you really understand it.
Kafka has done a lot of thinking in this process, and you can think about some of its highlights and advantages, just like the ZK election principle. The thoughts and ideas you come up with are much bigger than the knowledge itself. You can leave a comment to me and let’s discuss!
Kafka’s growth memory orientation, though, tends to increase the depth of the technology, but if you are familiar with NIO, you will certainly have a good understanding of NIO in the metadata pull process.
If you are not familiar with NIO, you can search the basic knowledge of NIO by yourself. Or follow my next NIO Little White Boot Camp.
See you in the next video!
This article is published by OpenWrite!