Sorted into hyperledger fabric source | kafka consensus
Article and code: github.com/blockchainG…
Branches: v1.1.0
An overview of the
Orderer Consensus component provides HandleChain() method to create consensus component Chain objects (consensus.Chain interface) for channel binding, including Solo (solo.chain type), Kafka (Kafka. ChainImpl type), etc. An important implementation module that belongs to the channel consensus component and is set to the cs.chain field of the Chain support object. The consensus component chain object provides Orderer consensus ordering service, which is responsible for ordering transactions on associated channels, packaging blocks, submitting ledger, channel management, etc. Currently, Golang channel or Kafka cluster is used as the consensus ordering back end to receive and sort transaction messages filtered and forwarded by Broadcast service.
Kafka consensus sorting service
Orderer service cluster
Orderer node uses Sarama open source Kafka third-party library to construct Kafka consensus components, which can accept and process transaction message requests sent by multiple clients at the same time, effectively improving the concurrency ability of Orderer node to process transaction messages. At the same time, we can use the function of Kafka cluster in a single partition to collect the same topic messages sequentially (message sequence number is unique) to ensure that the transaction messages have a deterministic order (sorting by message sequence number), so as to achieve a global consensus on the purpose of transaction ordering.
Kafka producers produce and publish messages by Topic, and The Kafka server cluster automatically classifies message topics. Messages on the same topic are collected into one or more partition files and appended to the end of the file in FIFO order. Each message has an OFFSET position in the partition as the unique ID of the message. Currently, Hyperledger Fabric binds a topic (i.e. chainID, chainID) for each channel created based on a Kafka cluster, and sets only one partition (partition number 0). Kafka consumers manage multiple partition consumers and subscribe to the topic messages of the specified partition, including the topic (chainID), the partition number (currently there is only one partition with partition number 0), and the start offset (the message location from which the subscription started).
Hyperledger Fabric uses a Kafka cluster to sort transaction messages submitted by a single or multiple Orderer sorting nodes. At this point, the Orderer ordering node acts as both the message producer (partition) and the consumer of the Kafka cluster, publishing and subscribing messages to the same topic partition on the Kafka cluster, that is, forwarding the transaction messages submitted by the Peer node to the Kafka server. At the same time, Retrieves sorted transaction messages from the Kafka partition on the specified topic and automatically filters restarted transaction messages. During this period, there may be network delay resulting in the difference in message acquisition time. If packet loss is not taken into account, the order and number of messages to be obtained by all Orderer nodes should be determined and consistent. At the same time, the same Kafka consensus component chain objects and block generation rules are adopted to ensure that all Orderer nodes can create and update the same configuration channel, and cut to generate the same batch transaction set block, and then “synchronize” to construct the same block data, so as to achieve global consensus based on Kafka cluster. To ensure global consistency of block data.
Start the consensus component chain object
Startup entry:
orderer/consensus/kafka/chain.go/Start()
func (chain *chainImpl) Start(a) {
go startThread(chain)
}
Copy the code
func startThread(chain *chainImpl){...// Create a kafka producer
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
...
// The Kafka producer sends a CONNECT message to establish a connection
iferr = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err ! =nil {
logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
}
...
// Create Kafka consumer
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
...
// create a Kafka partition consumer
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)...close(chain.startChan) // The consensus component chain object has been started, and Broadcast is not blocked
chain.errorChan = make(chan struct{}) // Create the errorChan channel without blocking the Deliver service processing handle. chain.processMessagesToBlocks()// Create a message processing loop to process the messages received on the subscription partition
}
Copy the code
The startThread function first creates the Kafka producer and publishes the message to the channel partition (chain-.channel) with the specified topic (channel ID) and partition number.
The connection is then established by sending a CONNECT message specifying the Topic field for the chain ID, the Key field for the partition number 0, the Value field for the CONNECT type message payload, and so on. The Message is received by the Kafka (partition) consumer that subscribes to the topic.
Next, create a Kafka consumer object that specifies the Kafka partition and Broker server configuration, and set up to get messages from the Kafka partition that specifies the topic (chain ID) and partition number (0).
Finally, the processMessagesToBlocks() method is called to create a message-processing loop that processes the subscription messages received from the Kafka cluster.
Process the message
ProcessMessagesToBlocks receives a normal Kafka partition consumer message and processes it based on the type of Kafka message, including the following types:
- Kafka- Message_Regular
- KafkaMessage_TimeToCut
- KafkaMessage_Connect
func (chain *chainImpl) processMessagesToBlocks(a) ([]uint64, error){...for { // Message processing loop
select{...case in, ok := <-chain.channelConsumer.Messages(): // A normal Kafka partition consumer message was received.select {
case <-chain.errorChan: // If this channel was closed... // If the channel is already closed, recreate it.switch msg.Type.(type) { // Analyze the Kafka message type
case *ab.KafkaMessage_Connect: // The Kafka connection message resumes the Kafka consumer partition subscription process due to an error
_ = chain.processConnect(chain.ChainID()) // Process the CONNECT connection message without doing anything
counts[indexProcessConnectPass]++ // The count of successfully processed messages increases by 1
case *ab.KafkaMessage_TimeToCut: // Kafka generates block messages with timed cuts
iferr := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err ! =nil {
logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
counts[indexProcessTimeToCutError]++
return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
}
counts[indexProcessTimeToCutPass]++ // The count of successfully processed messages increases by 1
case *ab.KafkaMessage_Regular: // Kafka regular messages
iferr := chain.processRegular(msg.GetRegular(), in.Offset); err ! =nil { // Process Kafka normal messages. counts[indexProcessRegularError]++ }... }case <-chain.timer: // Timeout timer
if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err ! =nil { // Send a message of type TimeToCut requesting to pack blocks. counts[indexSendTimeToCutError]++ } ... }}}Copy the code
① : KafkaMessage_Connect type message
The Kafka connection message is used to test the working status of connected Kafka partition consumers, to verify the normal working status of Kafka consensus components and troubleshoot, and call chain-processConnect (chain-.chainid ()) method to process the message.
② : KafkaMessage_TimeToCut message
The processMessagesToBlocks() method calls the chain-.processTimetocut () method to handle messages of type TIMETOCUT. If the block number ttcNumber in the message is not the block number of the next packaged block in the current channel ledger of the current Orderer node (lastCutBlockNumber+1), it is discarded without processing. Otherwise, the BlockCutter().cut () method is called to Cut the list of cached transaction messages to be processed on the channel into batch ([]* Cb.envelope), and then the CreateNextBlock(Batch) method is called to construct a new block and submit the ledger. Finally, the WriteBlock(metadata) method is called to update the block metadata and submit the ledger, while the lastCutBlockNumber of the Kafka Consensus component chain object is updated with an increment of 1.
In fact, the points in time at which the Orderer service cluster nodes independently package blocks are usually not completely synchronized, and it is possible to repeatedly receive TIMETOCUT type messages (duplicate block numbers) submitted by other Orderer nodes. At this point, the Orderer node takes the first TIMETOCUT message it receives, packs it up, submits it to the ledger, and updates the latest block number lastCutBlockNumber for the current channel. Thus, the processTimeToCut() method can use the latest lastCutBlockNumber to filter out other duplicate TIMETOCUT messages to ensure data synchronization of ledger block files on all Orderer nodes. In effect, the original time synchronization mechanism is converted into a message synchronization mechanism.
③ : KafkaMessage_Regular message
This includes channel configuration transaction messages (type KafkaMessageRegular_CONFIG) and normal transaction messages (type KafkaMessageRegular_NORMAL). The detailed analysis will be shown in the processRegular method.
Process configuration transaction messages
Let’s take a quick look at the section of ProcessRegular’s code that handles configuration transaction messages, and since it’s quite long, we need to start with an overview:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error{... commitConfigMsg :=func(message *cb.Envelope, newOffset int64){... } seq := chain.Sequence()// Get the latest configuration number of the current channel.switch regularMessage.Class {
case ab.KafkaMessageRegular_UNKNOWN: // Unknown message type.case ab.KafkaMessageRegular_NORMAL: // Normal transaction message type.case ab.KafkaMessageRegular_CONFIG: // The channel configures the transaction message. }... }Copy the code
KafkaMessageRegular_CONFIG case ab.kafkamessageregular_config
(1) : if regularMessage. OriginalOffset to 0
Note This is a channel configuration transaction message for refiltering validation and sorting.
1.1 Filtering Messages that are repeatedly submitted
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {}
Copy the code
1.2 Verify that the configuration transaction message is recently revalidated and reordered, and that the channel configuration serial number is up to date
if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset &®ularMessage.ConfigSeq == seq {
// Therefore, the channel is closed and the Broadcast service processing handle is unblocked waiting to receive the message again for processing
close(chain.doneReprocessingMsgInFlight)
}
Copy the code
1.3 actively update the initial offset lastResubmitted of the configured transaction message of the lastResubmitted order of this channel
Another Orderer node resubmitted the configuration message, but the local Orderer node did not resubmit the message. Therefore, it is necessary to update the initial offset of the configured transaction message lastResubmitted for the lastResubmitted order of this channel.
if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
}
Copy the code
(2) : regularMessage. OriginalOffset is 0
This indicates that the channel configuration transaction message is submitted for the first time, rather than being revalidated and reordered.
2.1 If the configuration number regularmessage. ConfigSeq in a message is smaller than the latest configuration number seq of the current channel
The channel configuration has been updated (the configuration number is higher), and the current configuration transaction message is processed (the configuration number is lower). ProcessConfigMsg will be called to refilter and process the message.
The configuration message is then resubmitted through configure to sort, resetting the initial offset of the message. It then updates the offset of the most recently resubmitted message.
if regularMessage.ConfigSeq < seq {
...
configEnv, configSeq, err := chain.ProcessConfigMsg(env)
iferr := chain.configure(configEnv, configSeq, receivedOffset); err ! =nil{... }// Blocks receiving message processing to update the offset of the most recently resubmitted message
chain.lastResubmittedConfigOffset = receivedOffset
// Create a channel to block the Broadcast service from receiving processing messages
chain.doneReprocessingMsgInFlight = make(chan struct{})}Copy the code
③ : Submit the configuration transaction message to perform channel management operations
Filter out the unqualified cases through ① and ② above, then commit configuration transaction message to perform channel management operation, core function: commitConfigMsg(env, offset)
3.1 Cut the current cached transaction messages into batch transaction sets
batch := chain.BlockCutter().Cut()
Copy the code
3.2 Create a new block
block := chain.CreateNextBlock(batch)
Copy the code
3.3 Constructing Kafka Metadata
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ // Construct Kafka metadata
LastOffsetPersisted: receivedOffset - 1.// The offset is reduced by 1
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
Copy the code
3.4 Writing blocks
Submit a new block to the ledger via the block write component, updating the current channel’s latest block number, chain.lastCutBlockNumber, by 1
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
Copy the code
Then update the chain’s lastoriginal-offset argument to newOffset and do much the same as above:
chain.lastOriginalOffsetProcessed = newOffset
block := chain.CreateNextBlock([]*cb.Envelope{message}) // Create a new block
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ // Construct Kafka metadata
LastOffsetPersisted: receivedOffset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteConfigBlock(block, metadata) // Write the configuration block
chain.lastCutBlockNumber++ // New block number increment by 1
Copy the code
WriteBlock and WriteConfigBlock are both commitblocks that are called as follows:
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte){...// Add block signatures
bw.addBlockSignature(bw.lastBlock)
// Add the latest configuration signature
bw.addLastConfigSignature(bw.lastBlock)
// Write a new block
err := bw.support.Append(bw.lastBlock)
...
}
Copy the code
Next, we’ll discuss how the Kafka Consensus component handles ordinary transaction messages.
Process ordinary transaction messages
Going back to the processRegular method, the general method for handling normal messages is as follows:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error{...case ab.KafkaMessageRegular_NORMAL: // Normal transaction message type
// If OriginalOffset is not 0, the message is revalidated and resubmitted
ifregularMessage.OriginalOffset ! =0{.../ / if the news offset is not more than lastOriginalOffsetProcessed has recently been offset processing messages,
// the message has already been processed, in which case the return should be discarded to prevent reprocessing of ordinary transaction messages with the same offset submitted by other Orderers
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
...
}
// // Check whether the channel configuration serial number is updated
if regularMessage.ConfigSeq < seq {
...
//// The configuration sequence number of the message is low, and you need to verify the filtering message again
configSeq, err := chain.ProcessNormalMsg(env)
...
// Resubmit the normal trade message
iferr := chain.order(env, configSeq, receivedOffset); err ! =nil{}... }// advance lastOriginalOffsetProcessed iff message is re-validated and re-ordered
/ / if and only if the message validation and reorder again, only need to modify lastOriginalOffsetProcessed offset
offset := regularMessage.OriginalOffset
if offset == 0 {
offset = chain.lastOriginalOffsetProcessed
}
// Submit ordinary transaction messages. Offset is the offset of the most recently processed ordinary transaction messages
commitNormalMsg(env, offset)
}
Copy the code
The process of processing ordinary transaction messages is similar to the process of configuring transaction messages. CommitNormalMsg (env, offset)
commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
//// Add the received messages to the cached transaction message list and cut them into a batch transaction set list
batches, pending := chain.BlockCutter().Ordered(message)
...
if len(batches) == 0 {
// If there is no batch transaction set, start the timer to periodically send the cut out block message n
chain.lastOriginalOffsetProcessed = newOffset
if chain.timer == nil {
chain.timer = time.After(chain.SharedConfig().BatchTimeout())
...
return
}
chain.timer = nil
offset := receivedOffset // Set the current message offset
if pending || len(batches) == 2 {
offset-- // Calculate the offset of the first batch trade message as offset minus 1
} else { // There is only one batch transaction set that constitutes one block
//// Sets the message offset for the first batch trade set to newOffset
chain.lastOriginalOffsetProcessed = newOffset
}
//// construct and submit the first block
block := chain.CreateNextBlock(batches[0])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata) // Update block metadata and commit blocks to ledger
chain.lastCutBlockNumber++ // Update the block number of the latest block on the current channel by 1.// Commit the second block if exists
//// examines the second batch trade set, constructs and commits the second block
if len(batches) == 2 {
chain.lastOriginalOffsetProcessed = newOffset
offset++ // Set the message offset of the second batch trade set to 1
block := chain.CreateNextBlock(batches[1]) metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ LastOffsetPersisted: offset, LastOriginalOffsetProcessed: newOffset, LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset, }) chain.WriteBlock(block, metadata) chain.lastCutBlockNumber++ ... }}Copy the code
The new ordinary trade messages are first added to the current cached trade list and sliced into a batch trade set list of batches, containing a maximum of two batches of batches and a second batch of batches containing a maximum of one transaction. WriteBlock is also ultimately called to write to the ledger.
At this point the entire processRegular() method finishes processing the message.
Summary and Reference
Kafka consensus sort logic is actually relatively simple, roughly the flow is as follows:
Github.com/blockchainG… (Article picture code information in it)
Wechat official account: Blockchain technology stack