Kafka version: 0.9.0

In front of the author wrote an article about clean sticky packet unpacking comprehensive explanation of TCP sticky packet unpacking related problems. Here’s a quick summary:

TCP sticky packet unpacking is caused by meaningful data packets at the application layer. The protocol at the transport layer does not understand the meaning of the packets and will not subcontract or send them according to your business content, but will only transmit data according to its own protocol format.

When you know the nature of the problem, it’s easy to solve it. After receiving the data, the application layer needs to judge whether the data is complete according to the identification. If the data is complete, we will parse the packet and finally hand it to the business code for processing.

Methods to solve the problem of sticking and unpacking:

(1) Message length fixed;

(2) add special characters for segmentation, such as adding a newline character at the end of each data;

(3) User-defined protocol, such as Len + data, where len represents the length of data bytes;

How does Kakfa solve the sticky unpacking problem?

First look at sticky packets, that is, received redundant data, how to split the packet, read the correct and complete packet?

Kafka uses the third solution above, custom protocol formats.

When Kafka receives a packet, it does something like this:

  1. The first 4 bytes are read and converted to an int, or length;

  2. Apply for memory buffer according to its length.

  3. Finally, read the specified size of data into the requested buffer;

Kafkachannel.read ()

public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } // read data receive(receive); Receive.com plete()) {receive.payload().rewind(); result = receive; receive = null; } // return result; } private long receive(NetworkReceive receive) throws IOException { return receive.readFrom(transportLayer); } public long readFrom(ScatteringByteChannel channel) throws IOException { return readFromReadableChannel(channel); } public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; Size = bytebuffer.allocate (4); // size is the buffer where len is stored. if (size.hasRemaining()) { int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; // Read the length if (! Size.hasremaining ()) {// rewind the len value size.rewind(); Int receiveSize = size.getint (); // receiveSize = size.getint (); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize ! = UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); This.buffer = ByteBuffer. Allocate (receiveSize); } } if (buffer ! = null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; }Copy the code

Next, look at the unpacking code. Unpacking means that the data received is not enough to form a complete data packet, how to wait for the complete data packet

The main code is the judgment logic in the receive.complete() method above.

public boolean complete() { return ! size.hasRemaining() && ! buffer.hasRemaining(); }Copy the code
  • ! Sie.hasremaining () : Len data received has been read;

  • ! Buffer.hasremaining () : The received data has been read.

Both conditions are true, i.e., len and data must be read before a complete piece of data is read.

The receive.complete() function returns false if the data is not fully read and null if the data is not fully read until the next OP_READ event is read.

The data read this time will be temporarily stored in the stageReceives data structure for the next read.

if (channel.ready() && key.isReadable() && ! hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) ! = null) addToStagedReceives(channel, networkReceive); } Official explanation for stageReceives:Copy the code

END