Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star
If the network is not set up, it will not send a message. In this article, we explain how producer and broker establish a connection, and look directly at the sender thread step 8:
{// omit. /** * NetWorkClient is a component that performs all network operations. This component includes sending requests and receiving responses (processing responses). this.client.poll(pollTimeout, now); }Copy the code
Next, NetworkClient’s poll() method:
Public List<ClientResponse> poll(long timeout, long now) {public List<ClientResponse> poll(long timeout, long now) { So we don't have to analyze it in detail. We probably know how to get metadata. After we analyze Kafka's network, when we look back at the code *, the code is relatively simple. * / / / step one: encapsulates a metadata request to pull long metadataTimeout = metadataUpdater. MaybeUpdate (now); Try {// Step 2: send a request, do a complex network operation // But we haven't learned about kafka's network yet // So all you need to know is that it sends a network request. this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // omit. } public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || ! immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); Int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); / / because of the way we use scenario-driven / / we had just registered is indeed a key if (readyKeys > 0 | |! ImmediatelyConnectedKeys. IsEmpty ()) {/ / will immediately to deal with the Selector key to above. pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } //TODO receives () from stagedReceives; long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }Copy the code
Java NIO Selector. Select -> it’s responsible for looking at the multiple channels registered with him, who has a response to receive, or who can now send a request, and if a Channel is ready to do an IO read or write, Now I’m going to return the SelectionKey of that Channel
The collection of SelectionKeys is then processed. At this point, we can see many enterprise-level functions developed based on NIO. One is how SocketChannel is built, and two are how a client can connect to multiple servers. How do three call Selector. Select via polling
Select in this scenario, you can set a timeout, and then you can get the SelectionKeys
Let’s focus on the pollSelectionKeys() method:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager ! = null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ /** * So the first time our code comes in, we should go to this branch, because what we registered was * SelectionKey. Key = socketChannel.register(nioSelector, * selectionkey.op_connect); . * / if (isImmediatelyConnected | | key isConnectable ()) {/ / TODO core code to the / / to finally complete the connection of network initialization, before / / if we didn't finish the network connection, This will definitely help you complete the network connection. If (channel.finishConnect()) {// After the network connection is complete, store the channel to this.connected. Add (channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } // omit. } } private static class IdleExpiryManager { private final Map<String, Long> lruConnections; private final long connectionsMaxIdleNanos; private long nextIdleCloseCheckTime; }Copy the code
IdleExpiryManager contains lruConnections because in general a client can not put too many Socket connection resources, otherwise it will lead to the client complex too heavy, so he needs to use the LRU way to continue to eliminate the least recently used some connections, Many connections have not been sending messages recently.
- For example, if you have a connection that was last used an hour ago, and a connection that was last used a minute ago, who would you choose to eliminate a connection? The LRU algorithm, apparently, eliminates the connection that was used just an hour ago
If SelectionKey is found to be in a state where a connection can be established, the isConnectable method is true, and then it essentially calls the finishConnect method of the bottom SocketChannel in KafkaChannel and waits for the connection to complete
And then you’re not going to care about the OP_CONNECT event, so for this Channel, then the Selector is not going to care about the connection event, it’s not going to care about the OP_READ event, it’s going to care about the OP_WRITE event, it’s going to write to that connection.
At this point, the Producer finally establishes a connection with the broker.
Reference Documents:
Kafka 0.10.2.0-src kafka 0.10.2.0-src kafka 0.10.2.0-src
Kafka technology insider – Graphic details kafka source code design and implementation
Kafka source code analysis series