Screwing hyperledger fabric source | Deliver block distribution
Article and code: github.com/blockchainG…
Branches: v1.1.0
An overview of the
The Orderer sort server provides the block distribution service interface. It receives block request messages submitted by clients (Envelope type, channel header type DELIVER_SEEK_INFO, CONFIG_UPDATE, etc.) and searches for information objects (SeekInfo type) based on the block encapsulated by the message. It includes searching the oldest block SeekOldest type, searching the newest block SeekNewest type, searching the specified location block SeekSpecified type, etc., constructing the scope query result iterator corresponding to the request scope, reading the block data in the ledger of the designated channel of the Orderer node, and establishing a message processing cycle. Based on the result iterator, the requested block data results are read in turn and sent to request nodes such as the Leader master node of the organization.
When the Orderer node starts, it registers the Orderer ordering server on the local gRPC server and creates the Deliver service processing handle. When the client initiates a Deliver service request, the Orderer ordering server calls the Deliver() method to process the message request.
Diliver message service processing
Entrance to the orderer/common/server/server to go/Deliver () method:
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error{... policyChecker :=func(env *cb.Envelope, channelID string) error { // Define the policy inspector
chain, ok := s.GetChain(channelID) // Gets the chain support object for the specified channel
if! ok {return errors.Errorf("channel %s not found", channelID)
}
// Create a message filter
sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain)
return sf.Apply(env) // Filter messages
}
server := &deliverMsgTracer{
DeliverSupport: &deliverHandlerSupport{AtomicBroadcast_DeliverServer: srv},
msgTracer: msgTracer{
debug: s.debug,
function: "Deliver",}}// Deliver service message processing
return s.dh.Handle(deliver.NewDeliverServer(server, policyChecker, s.sendProducer(srv)))
}
Copy the code
I probably did the following things:
- Define policy checker: Checks that the received block request message must meet the requirements of the access control permission policy on the specified channel
- Gets the chain support object for the specified channel
- Create a message filter to filter messages
- The Deliver service message handles block requests
Let’s see how this works, entering S.dhandle:
/common/deliver/deliver.go/Handle
func (ds *deliverHandler) Handle(srv *DeliverServer) error{...// Wait for a message request and process it
for{... envelope, err := srv.Recv()// Waiting to receive block message requests from clients.// Fetch the specified block from the block ledger of the Orderer node's local specified channel and send the request to the client
iferr := ds.deliverBlocks(srv, envelope); err ! =nil {
returnerr } ... }}Copy the code
It goes without saying that directly into deliverBlocks, this part is the most core, step by step analysis is as follows:
PayLoad parses the PayLoad and checks the validity of the header and ChannelHeader
payload, err := utils.UnmarshalPayload(envelope.Payload) // Parse the message payload.if payload.Header == nil {}
// Parse channel headers
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
err = ds.validateChannelHeader(srv, chdr) // Verify that the channel header is valid
Copy the code
(2) Obtain chain support object (chainID) from the chain dictionary and check whether there is an error message in this object
chain, ok := ds.sm.GetChain(chdr.ChannelId) // Gets the chain support object for the specified channel
Copy the code
③ create an access control object and check whether the message signature complies with the specified channel read permission policy **
accessControl, err := newSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
...
err := accessControl.evaluate()
Copy the code
④ : Parse the SeekInfo structure object for block search information
seekInfo := &ab.SeekInfo{}
iferr = proto.Unmarshal(payload.Data, seekInfo); err ! =nil {}
Copy the code
⑤ : Check the validity of the starting position and the ending position
if seekInfo.Start == nil || seekInfo.Stop == nil {}
Copy the code
⑥ : Create a block ledger iterator and get the start block number, and set the start position
cursor, number := chain.Reader().Iterator(seekInfo.Start)
Copy the code
Iterator calculates the start block number startingBlockNumbe based on the Type of startPosition.Type the startingBlockNumbe object.
-
SeekPosition_Oldest: Search for the oldest block and set startingBlockNumber to 0.
-
SeekPosition_Newest: Searches for the newest block, and sets startingBlockNumber to info. height-1, which is the latest block number of the current channel ledger minus 1.
-
SeekPosition_Specified: Search for the block at the Specified location and set startingBlockNumber to start.specification. Number.
Iterator methods roughly function is as follows: common/gotten/blockledger/file/impl. Go/Iterator
func (fl *FileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
var startingBlockNumber uint64
switch start := startPosition.Type.(type) { // Analyze the starting position type
case *ab.SeekPosition_Oldest: // Search for the oldest block with block number 0
startingBlockNumber = 0
case *ab.SeekPosition_Newest: // Search for the latest block
info, err := fl.blockStore.GetBlockchainInfo() // Get blockchain information
iferr ! =nil {
logger.Panic(err)
}
newestBlockNumber := info.Height - 1 // The latest block number
startingBlockNumber = newestBlockNumber
case *ab.SeekPosition_Specified: // Search for the specified location block
startingBlockNumber = start.Specified.Number
height := fl.Height()
if startingBlockNumber > height { // If the height is exceeded, an error is reported
return &blockledger.NotFoundErrorIterator{}, 0
}
default:
return &blockledger.NotFoundErrorIterator{}, 0
}
// Construct block iterators
iterator, err := fl.blockStore.RetrieveBlocks(startingBlockNumber)
iferr ! =nil {
return &blockledger.NotFoundErrorIterator{}, 0
}
// Construct ledger block iterators
return &fileLedgerIterator{ledger: fl, blockNumber: startingBlockNumber, commonIterator: iterator}, startingBlockNumber
}
Copy the code
⑦ : cyclic reading block data, block data within the specified block number range from the local block ledger, and sent to the request client in sequence
7.1 No Data Is Found
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()- 1 {
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}
Copy the code
7.2 Obtaining The Next Data
block, status := nextBlock(cursor, erroredChan) // Get the next block from the local ledger
ifstatus ! = cb.Status_SUCCESS {... }Copy the code
7.3 Checking whether the access control policy meets the requirements
iferr := accessControl.evaluate(); err ! =nil {}
Copy the code
7.4 Sending Block Data
iferr := sendBlockReply(srv, block); err ! =nil{}Copy the code
7.5 The loop ends and the message is sent successfully
if err := sendStatusReply(srv, cb.Status_SUCCESS);
Copy the code
Deliver the service client
Using the Leader master node as an example, analyze the flow of a Deliver service client requesting a block from an Orderer node.
Initialize the Deliver service instance
Entry: gossip/service/gossip_service. Go/InitializeChannel
func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string, support Support){... g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator)if g.deliveryService[chainID] == nil { // Check whether a Deliver service instance already exists
var err error
g.deliveryService[chainID], err = g.deliveryFactory.Service(g, endpoints, g.mcs) // Check whether a Deliver service instance already exists./ / peer. Gossip. Leader useLeaderElection and peer.gossip.org are two mutually exclusive configuration parameters,
// If both are set to true and not defined, the Peer node will fail
// Enable the dynamic election mechanism for the Leader primary node
leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
// Statically set as the organization Leader primary node
isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")...if leaderElection { // Enable the dynamic Leader primary node election mechanism
logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, support.Committer))
} else if isStaticOrgLeader {
// If the Leader primary node is statically specified, connect to the Orderer node to request block data
// Start the Deliver service instance on the specified channel to request block data
g.deliveryService[chainID].StartDeliverForChannel(chainID, support.Committer, func(a){})}... }Copy the code
Firstly, check whether the Deliver instance already exists, and then enter different branches according to the dynamic election mechanism of the Leader primary node or the statically designated Leader primary node. If the Leader primary node is statically designated, The Orderer node is connected to request block data, and the Deliver service instance on the specified channel is started to request block data. Next, focus on starting the Deliver service instance.
Start the Deliver service instance
Mainly did the following things:
① : Gets the block provider of the channel specified by the binding
if _, exist := d.blockProviders[chainID];
Copy the code
② : There is no block provider
client := d.newClient(chainID, ledgerInfo)
Copy the code
func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
requester := &blocksRequester{ // Define the block requester blocksRequester structure object
tls: comm.TLSEnabled(),
chainID: chainID,
}
// Define the broadcastSetup() method
broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
return requester.RequestBlocks(ledgerInfoProvider) // Request block data}...// Create connProducer objects
connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID), d.conf.Endpoints)
//// Create the broadcastClient
bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
requester.client = bClient // Set the client to the block requester object
return bClient
}
Copy the code
2.1 Creating the broadcastClient on the Deliver Service instance
client := d.newClient(chainID, ledgerInfo)
Copy the code
2.2 Create the block provider associated with the specified channel
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
Copy the code
The Goroutine starts to request blocks from the Orderer node and send them to other Peer nodes in the organization
go func(a) {
d.blockProviders[chainID].DeliverBlocks() // Request block data
finalizer()
}()
Copy the code
The next step is to call the DeliverBlocks() method of the block provider object and send the block data requested by the message to the Orderer node.
Request block data
The entrance: core/deliverservice blocksprovider/blocksprovider. Go/DeliverBlocks (), specific analysis is as follows:
① : Receives messages
msg, err := b.client.Recv()
Copy the code
② : The message is processed according to the message type
There are roughly the following message types:
- DeliverResponse_Status: Used to describe
Deliver
Service request execution status. - DeliverResponse_Block: Contains the requested block data.
2.1 DeliverResponse_Status branch
If the DeliverBlocks() method receives Status_SUCCESS, the block request has been successfully processed, indicating that the block data within the specified range of the block request message has been received. Other status messages are unsuccessful execution status messages, including Status_BAD_REQUEST and Status_FORBIDDEN
if t.Status == common.Status_SUCCESS {}
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {}
if t.Status == common.Status_BAD_REQUEST {
b.client.Disconnect(false)}else {
b.client.Disconnect(true)}Copy the code
2.2 DeliverResponse_Block branch
2.2.1 Obtaining the block number
seqNum := t.Block.Header.Number
Copy the code
2.2.2 Get the serialized block byte array
marshaledBlock, err := proto.Marshal(t.Block)
Copy the code
2.2.3 Verify blocks
err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock);
Copy the code
2.2.4 Obtaining the Number of Channel peers
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
Copy the code
2.2.5 Creating message payload and Gossip Messages
payload := createPayload(seqNum, marshaledBlock)
gossipMsg := createGossipMsg(b.chainID, payload)
Copy the code
2.2.6 Add the message load to the local message load buffer and wait for the ledger to be committed
err := b.gossip.AddPayload(b.chainID, payload)
Copy the code
2.2.7 Sending block messages to other nodes in the organization using the Gossip message protocol
Based on the Gossip message protocol, DataMsg data messages (containing only block data) are distributed to other Peer nodes in the organization and stored in the message storage of this node.
b.gossip.Gossip(gossipMsg)
Copy the code
reference
Github.com/blockchainG… (Article picture code information)
Wechat official account: Blockchain technology stack