Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

Kafka source code analysis 7- Filtering brokers that can send messages how to establish a network connection, this chapter will explain this part in detail.

Network core component analysis

Let’s first introduce the core components of network design. The corresponding codes of NetworkClient, Selector, KafkaChannel and TransportLayer are as follows:

public class NetworkClient implements KafkaClient { private static final Logger log = LoggerFactory.getLogger(NetworkClient.class); /* the selector used to perform network i/o */ private final Selectable selector; private final MetadataUpdater metadataUpdater; private final Random randOffset; /* the state of each node's connection */ private final ClusterConnectionStates connectionStates; /* the set of requests currently being sent or awaiting a response */ private final InFlightRequests inFlightRequests; // omit... }Copy the code

NetworkClient is a network communication component. The Selector of the underlying core is responsible for establishing the core connection, initiating the request, and processing the actual network IO. The initialization entry is initially found. Kafka encapsulates some of its own components, but underneath it is a Selector with the most core Java NIO.

/*** * todo implements Selectable {public class Selector implements Selectable {public static final long NO_IDLE_TIMEOUT_MS = -1; private static final Logger log = LoggerFactory.getLogger(Selector.class); // This object is the Selector in javaNIO. // The Selector is responsible for setting up the network, sending the network request, and handling the actual network IO. // So it is the most core such a component. private final java.nio.channels.Selector nioSelector; SocketChannel SocketChannel SocketChannel SocketChannel SocketChannel SocketChannel SocketChannel private final Map<String, KafkaChannel> channels; Private final List<Send> completedSends; // The response that has been received and finished processing. private final List<NetworkReceive> completedReceives; // A response that has been received but has not yet been processed. Private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; private final Set<SelectionKey> immediatelyConnectedKeys; private final Map<String, KafkaChannel> closingChannels; Private final List<String> disconnected; Private final List<String> Connected; // Establish the connection to the failed host. private final List<String> failedSends; private final Time time; private final SelectorMetrics sensors; private final String metricGrpPrefix; private final Map<String, String> metricTags; private final ChannelBuilder channelBuilder; private final int maxReceiveSize; private final boolean metricsPerConnection; private final IdleExpiryManager idleExpiryManager; }Copy the code

The core of it, at the bottom of Kafka Elector, is actually aSelector that encapsulates the native Java NIO, and the key component is a multiplexing component, which is called by a thread and listens directly for requests and responses from multiple network connections. The parameters are defined as follows:

  • MaxReceiveSize: indicates the maximum amount of data that can be received

  • ConnectionsMaxIdle: The maximum amount of time each network connection can be idle is reclaimed

  • Map<String, KafkaChannel> channels, which stores the mapping between each broker ID and Channel. For each broker, there is a network connection. Each connection has a corresponding SocketChannel in the semantics of NIO. We estimate that KafkaChannel encapsulates a SocketChannel

  • List completedSends, requests that have been sent successfully

  • List completedReceives, the response that has been received back and is processed

  • Map<KafkaChannel, Dequeue>, the responses received by each Broker that have not yet been processed

  • Conneted, disconnected, failedSends, brokers that have successfully established a connection, brokers that have not successfully established a connection, brokers that sent requests that failed

  • Channels broker ID corresponds to a network connection, and a network connection corresponds to a KafkaChannel. The bottom layer corresponds to a SocketChannel, which corresponds to a Socket at the lowest level of network communication. Socket communication, TCP

  • Send is a request that the underlying Channel sends out, and it can change over time, because once you Send a request, you Send the next one

  • NetworkReceive, the last response read by this Channel, is stored here, and is constantly changing because new responses are read

  • TransportLayer is a SocketChannel that encapsulates the underlying Java NIO

The source code inside Selector must take you into every detail of the study, because this is a set of network communication framework that has completely experienced a large number of complex, large-scale scenarios around the world, a set of network communication framework based on NIO encapsulation

KafkaChannel and PlaintextTransportLayer are as follows:

public class KafkaChannel { private final String id; private final TransportLayer transportLayer; private final Authenticator authenticator; private final int maxReceiveSize; // Read buffer, private NetworkReceive; // Write cache, the underlying use of ByteBuffer private Send Send; // Track connection and mute state of channels to enable outstanding requests on channels to be // processed after the channel is disconnected. private boolean disconnected; private boolean muted; } public class PlaintextTransportLayer implements TransportLayer { private final SelectionKey key; private final SocketChannel socketChannel; private final Principal principal = KafkaPrincipal.ANONYMOUS; }Copy the code

How to initialize a Network, a Selector, or a Channel, kafka encapsulates it, and how it works with native Java NIO’s Selector and Channel.

The code analysis

InitiateConnect (Node, now) initiateConnect(node, now)

private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); this.connectionStates.connecting(nodeConnectionId, now); //TODO attempts to establish a connection selector. Connect (nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); }}Copy the code

Then see org.apache.kafka.common.net work. The Selector of the connect () method:

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.channels.containsKey(id)) throw new IllegalStateException("There is  already a connection for id " + id); // For those of you who are familiar with Java NIO programming, this code is just basic // code, exactly the same as our NIO programming code. SocketChannel SocketChannel = socketchannel.open (); / / set to non-blocking mode socketChannel configureBlocking (false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); // Set some parameters // These network parameters, which we showed you before when analyzing Producer // all have some default values. if (sendBufferSize ! = Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize); if (receiveBufferSize ! = Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); // The default value of this is false, which means that Nagle's algorithm is enabled // It collects small packets from the network into one large packet and then sends it out. Because it thinks that if the network has a large number of small packets in transmission // actually will affect the network congestion. // Kafka must not set this to false, because we sometimes have packets that are too small for us to send. socket.setTcpNoDelay(true); boolean connected; Try {// Try to connect to the server. // Because this is non-blocking // it may succeed immediately, and return true if successful, or it may take a long time to succeed, and return false. connected = socketChannel.connect(address); } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; } //SocketChannel registers an OP_CONNECT SelectionKey on the Selector. Key = socketchannel. register(nioSelector, SelectionKey.OP_CONNECT); / / according to according to the SocketChannel encapsulation out a KafkaChannel KafkaChannel channel. = channelBuilder buildChannel (id, key, maxReceiveSize); Attach (channel) KafkaChannel; attach(channel) KafkaChannel; This.channels. Put (id, channel); // So normally, the network cannot complete the connection here. // If the connection cannot be completed here. Can you guess where Kafka makes its final connection to the network? if (connected) { // OP_CONNECT won't trigger for immediately connected channels log.debug("Immediately connected to node  {}", channel.id()); immediatelyConnectedKeys.add(key); // Unregister the OP_CONNECT event. key.interestOps(0); }}Copy the code

The parameters of the SocketChannel

  • ConfigureBlocking is set to non-blocking mode

  • The meaning of EPalive is mainly to prevent either client or server from being disconnected from the network without others knowing. So after setting this, if there is no communication between the two parties within 2 hours, then send a probe packet and stay connected, reconnect or disconnect based on the results of the probe packet

  • Set the size of the socket’s send and receive buffers to 128KB and 32KB, respectively. The size of the buffer is usually set in NIO programming

  • TcpNoDelay, if the default is set to false, then start Nagle algorithm, is to collect some small packets in network communication, assemble into a large packet and then send out once again, if a large number of small packets in transmission, will lead to network congestion; If set to true, this means that Nagle is turned off and the packets you send are immediately sent over the network, so be careful about this parameter

NetworkClient, Selector, KafkaChannel, ConnectStates, these are things that are extremely valuable for us to study, and it’s very good for us to build the foundation of our technology, assuming that we’re really going to develop a network communication program, and we’re going to do it based on NIO

For clients, how should their SocketChannel be set? You can do things like KeepAlive, TcpNoDelay, SocketBuffer

connect(address)

If the SocketChannel is is set to a non-blocking mode, then calls to the connect method, to initialize a non-blocking connection request, and if the connection immediately launched a successful, such as the client to connect with the server on one machine, there will be a immediately at this time the connection is successful, It then returns a true; Otherwise, if the connection is not immediately successful, false is returned and SocketChannel’s finishConnect method is later called to complete the connection

Cache kafkaChannel

Initialize a SocketChannel and then make a connection request, and then whether the connection request is successful or not, you need to cache the SocketChannel, and then you can make a connection based on that thing, or make a read or write request

So once you make a connection, you register that SocketChannel directly with the Selector, and you have the Selector watch for that SocketChannel’s OP_CONNECT event, which is if someone agrees to make a connection with it, and it gets a SelectionKey

Brokerid encapsulates SelectionKey and BrokerId as KafkaChannel, which encapsulates SelectionKey first in the TransportLayer. Authenticator, BrokerID, directly encapsulates a KafkaChannel

Socketchannels are obtained from the SelectionKey when network requests and processing are made through the SelectionKey. You can retrieve a core component that was attached before, replicating the handling of the request response. Caches the SelectionKey that establishes the connection immediately

conclusion

Native NIO programming, you can put his native API and components package, can be based on your own needs to achieve different functions, you must learn to carry out a certain cache mechanism design, such as for a number of machines to connect, then the corresponding connection components need to cache

Reference Documents:

Kafka 0.10.2.0-src kafka 0.10.2.0-src kafka 0.10.2.0-src

Kafka technology insider – Graphic details kafka source code design and implementation

Kafka source code analysis series