To learn RocketMQ, you need to understand two things: communication and storage. Wireshark Lua plugin for RocketMQ: Wireshark Lua plugin for RocketMQ

Here are some things you can learn from reading this article.

  • Wireshark Lua plug-in skeleton code how to write
  • How to implement Hello World plug-in
  • RocketMQ basic communication protocol format
  • What does RocketMQ look like when PULL has a message in its Body format

The Hello World plugin

Wireshark’s About page shows the current version of Lua supported by Wireshark.

As you can see, Lua version 5.2.4 is currently supported. Let’s look at some skeleton code.

-- Declaration agreementlocal NAME = "RocketMQ"
local PORTS = { 9876, 10911 }
local proto = Proto.new(NAME, "RocketMQ Protocol") -- Declare dissector functions to handle packagesfunction proto.dissector(tvb, pinfo, tree)
    print("load plugin... demo")
    pinfo.cols.protocol = proto.name;
    pinfo.cols.info = "Hello, World"End -- Register Dissector with Wiresharkfor _, port in ipairs(PORTS) do
    DissectorTable.get("tcp.port"):add(port, proto)
end
Copy the code

Find the wireshark plug-in directory, on my computer. This path is/Applications/wireshark app/Contents/Resources/share/wireshark, modify the init. Lua files

vim /Applications/Wireshark.app/Contents/Resources/share/wireshark/init.lua
Copy the code

Add a line to the dofile call to load the lua file above.

. dofile("/path/to/demo.lua")
Copy the code

The results before and after execution are as follows.

Parse the RocketMQ protocol

The communication protocol of RocketMQ is relatively simple, and the overall protocol format is shown below.

RocketMQ’s communication protocol consists of four parts:

  • Part 1: The first four bytes represent the total length of the remaining three parts (not including the four bytes themselves)
  • Part 2: The next four bytes represent the length of the Header part
  • The following Header Length is the protocol Header, which is stored after json serialization. It is used to represent different request response types
  • Part FOUR: Body content

Take an actual package as an example:

The first four bytes 00 00 019b indicate the Length of the entire packet 411(0x019b). The next four bytes 00 00 00 00 D4 indicate the Header Length, which is 212(0xD4). The next 212 bytes indicate the contents of the Header. As you can see, this is a JSON string, with the last 195 (411-4-2122) bytes representing the actual content of the Body. The specific message format will be discussed below.

Next, let’s write the program for parsing.

The parsed logic is performed in the proto.dissector method, whose signature is shown below.

function proto.dissector(tvb, pinfo, tree)
end
Copy the code

These parameters are interpreted as follows:

  • TVB stands for Testy Virtual Buffer, which contains the contents of a packet
  • Pinfo is short for Packet Information, which indicates the Information about a Packet. You can obtain the source port and destination port of a Packet.
  • Tree Indicates the Display tree of the Wireshark UI. The information obtained from parsing packages is added to the hierarchical tree.

Next we show the four parts of the RocketMQ communication to Wireshark. The code for modifying the proto.dissector function is shown below.

function proto.dissector(tvb, pinfo, tree)
    print("load plugin... demo")

    local subtree = tree:add(proto, tvb())
    pinfo.cols.protocol = proto.name;
    pinfo.cols.info = ""

    local length = tvb(0, 4):uint()
    subtree:add("Total Length", length)
    local headerLength = tvb(4, 4):uint()
    subtree:add("Header Length", headerLength)
    local headerData = tvb(8, headerLength):string()
    subtree:add("Header", headerData)
    local bodyDataLen = length - 4 - headerLength
    local bodyData = tvb(8 + headerLength, bodyDataLen):string()
    subtree:add("Body", bodyData)
end
Copy the code

Reload the Lua script and you can see that several parts of the RocketMQ protocol are displayed in Wireshark.

In order to distinguish between Request and Response, we can use the target port number to distinguish, add a method.

function isRequest(pinfo)
    local dstPort = pinfo.dst_port;
    for _, port in ipairs(PORTS) do
        if (dstPort == port) then
            return true
        end
    end
    return false
end
Copy the code

Added a distinction between request and response in Proto.dissector, adding a more readable description.

if (isRequest(pinfo)) then
    pinfo.cols.info:append("[REQUEST]"."Write write write." ")
else
    pinfo.cols.info:append("[RESPONSE]"."Left left left")
end
Copy the code

The effect is shown below.

The next thing we need to do is parse the JSON and show it a little bit better, starting with the header and body requests and responses in JSON format. Add a recursive method to handle json data uniformly.

-- k and v represent the KEY and value of json respectively, and tree represents the UI treefunction parseAndAddTree(k, v, tree)
    if (type(v) == 'table') then
        local sizeStr = ""
        if (#v > 0) then
            sizeStr = "size: ".#v
        end;
        local childTree = tree:add(k, sizeStr, tree)
        for key, value in pairs(v) do
            parseAndAddTree(key, value, childTree)
        end
    else
        tree:add(k .. ":", json.stringify(v))
    end
end
Copy the code

Add Header parsing to the proto.dissector method, as shown below.

local subtree = tree:add(protoMQ, tvb())
local headerTree = subtree:add("Header"."") -- parse jsonlocal header = json.parse(headerData, 1, "}")

for k, v in pairs(header) do
    parseAndAddTree(k, v, headerTree)
end
Copy the code

Reload and run the above code with the effect shown below.

A more readable string representation of the request and response codes can also be found in the RocketMQ source code,

local requestCodeMap = {
    [10] = "SEND_MESSAGE",
    [11] = "PULL_MESSAGE",
    [12] = "QUERY_MESSAGE". }local responseCode = {
    [0] = "SUCCESS",
    [1] = "SYSTEM_ERROR",
    [2] = "SYSTEM_BUSY",}Copy the code

This can also be done if the Body is a JSON string, as shown below.

However, in some cases, the Body is not represented as a JSON string. For example, during PULL messages, if the server returns a consumable message, the Body is not stored as a string, but in RocketMQ’s custom message format, as shown below.

Writing this parsing was a manual task. I implemented a lua version using RocketMQ’s Java source code. The complete code is shown below.

function decodeMessageExt(bodyTree, pinfo, bodyData)
    local bodyTree = bodyTree:add("Body"."")

    pinfo.cols.info:append(">>>>#FOUND#")

    local offset = 0;

    bodyTree:add("totalSize", bodyData(offset, 4):int())
    offset = offset + 4;

    local magicCode = string.format("The 0 X % for X 8.8.", bodyData(offset, 4):uint())
    bodyTree:add("magicCode", magicCode)
    offset = offset + 4;

    bodyTree:add("bodyCRC", bodyData(offset, 4):int())
    offset = offset + 4;

    bodyTree:add("queueId", bodyData(offset, 4):int())
    offset = offset + 4;

    bodyTree:add("flag", bodyData(offset, 4):int())
    offset = offset + 4;

    bodyTree:add("queueOffset", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    bodyTree:add("physicOffset", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    bodyTree:add("sysFlag", bodyData(offset, 4):int())
    offset = offset + 4;


    bodyTree:add("bornTimeStamp", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    local bornHost = bodyData(offset, 1):uint()
            .. ".". bodyData(offset + 1, 1):uint() ..".". bodyData(offset + 2, 1):uint() ..".". bodyData(offset + 3, 1):uint() bodyTree:add("bornHost", bornHost)
    offset = offset + 4;

    bodyTree:add("port", bodyData(offset, 4):int())
    offset = offset + 4;
    bodyTree:add("storeTimestamp", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;

    local storeHost = bodyData(offset, 1):uint()
            .. ".". bodyData(offset + 1, 1):uint() ..".". bodyData(offset + 2, 1):uint() ..".". bodyData(offset + 3, 1):uint() bodyTree:add("storeHost", storeHost)
    offset = offset + 4;

    bodyTree:add("storePort", bodyData(offset, 4):int())
    offset = offset + 4;

    --13 RECONSUMETIMES
    bodyTree:add("reconsumeTimes", bodyData(offset, 4):int())
    offset = offset + 4;
    --14 Prepared Transaction Offset
    bodyTree:add("preparedTransactionOffset", bodyData(offset, 8):int64():tonumber())
    offset = offset + 8;
    --15 BODY
    local bodyLen = bodyData(offset, 4):int()
    --            bodyTree:add("bodyLen", bodyLen)
    offset = offset + 4;

    bodyTree:add("body:", bodyData(offset, bodyLen):string())
    offset = offset + bodyLen;

    --16 TOPIC
    local topicLen = bodyData(offset, 1):int()
    offset = offset + 1;
    --            bodyTree:add("topicLen", topicLen)
    local topic = bodyData(offset, topicLen):string()
    bodyTree:add("topic:", topic)
    pinfo.cols.info:append(" topic:". topic) offset = offset + topicLen; --17 propertieslocal propertiesLength = bodyData(offset, 2):int()
    offset = offset + 2;
    bodyTree:add("propertiesLength", propertiesLength)

    if (propertiesLength > 0) then
        local propertiesStr = bodyData(offset, propertiesLength):string()
        offset = offset + propertiesLength
        local propertiesTree = bodyTree:add("propertiesStr"."size: ". propertiesLength)for k, v in string.gmatch(propertiesStr, "(%w+)\1(%w+)") do
            propertiesTree:add(k, v)
        end
    end
end
Copy the code

The results of this run are shown below.

The full code is on Github: github.com/arthur-zhan… If you are interested, you can take a look. In addition to the functionality in the previous article, there is also the ability to extract useful information, such as topics, into the Info column for viewing communications.

Afterword.

It’s fun to play around with this, but there are plenty of interesting uses for Lua in background development, besides being useful for OpenResty and Redis.

By writing this plug-in, I became more aware of the details of RocketMQ communication myself, and I can write about the details of RocketMQ communication later.

If you have any questions, you can scan the following TWO-DIMENSIONAL code to follow my official number to contact me.