Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star
Let’s look directly at NetworkClient’s poll() method:
Poll (long timeout, long now) {public List<ClientResponse> poll(long timeout, long now) {// Step 3: process the response, and the response will have the metadata we need. handleAbortedSends(responses); handleCompletedSends(responses, updatedNow); /** * this is where we look at how producers get metadata. * Kafak fetches metadata in exactly the same way we send messages. * Get metadata -> Determine whether a network connection is established -> Establish a network connection * -> Send requests (requests for metadata) -> Server sends back responses (with clustered metadata information) */ handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); HandleTimedOutRequests (responses, updatedNow); // invoke callbacks for (ClientResponse response : Responses) {try {// The callback function of the request we sent earlier // Seeing here, we go back to look at how we encapsulated the request when we sent the request. // We haven't seen it yet, but we can hazard a guess. // When encapsulating the network request, it must bind a callback function to it. response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } return responses; }Copy the code
HandleTimedOutRequests (Responses, updatedNow)
Private void handleTimedOutRequests(List<ClientResponse> Responses, long Now) {// Obtain hosts that requested timeouts. List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs); For (String nodeId: nodeIds) {// Close connection to the node // Close connection to the timeout host this.selector. log.debug("Disconnecting from node {} due to request timeout.", nodeId); ProcessDisconnection (responses, nodeId, now); } // we disconnected, so we should probably refresh our metadata if (! nodeIds.isEmpty()) metadataUpdater.requestUpdate(); } private void processDisconnection(List<ClientResponse> responses, String nodeId, Long) {/ / modify the connection state connectionStates disconnected (nodeId, now); nodeApiVersions.remove(nodeId); nodesNeedingApiVersionsFetch.remove(nodeId); for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) { log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId); if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id) metadataUpdater.handleDisconnection(request.destination); Else // Process these requests // You can see an interesting thing // encapsulate a response. There is no server response message in this response (the server did not give the response) // The disconnected state table identifies it as true response.add (request.disconnected(now)); }}Copy the code
In other words, for a request that has not received a response for a long time, the Producer encapsulates the response message and returns it.
Reference Documents:
Kafka 0.10.2.0-src kafka 0.10.2.0-src kafka 0.10.2.0-src
Kafka technology insider – Graphic details kafka source code design and implementation
Kafka source code analysis series