Screwing hyperledger fabric source | 4-8 node of charge to an account

Article and code: github.com/blockchainG…

Branches: v1.1.0

An overview of the

The source code of the design and implementation of the functional module of the Committer billing node is mainly distributed in the following table:

Source directory file Function in this paper, the
core committer Txvalidator Transaction validator function module
committer.go Ledger committer interface definition
committer_impl.go Ledger submitter implementation
ledger kvledge KvLedger ledger function module
ledgerstorage Ledger data storage object module
pvtdatastorage Privacy data storage object module
ledgermgmt Account management module
customtx Configure the transaction handler module
common ledger blockstorage Block storage module
protos Common ledger Protobuf message definition module

The following part will be analyzed around this part.

Create a Committer function module

The Peer node calls the CSCC system chain code to join the application channel through the request, and executes the joinChain()→ peer-.create-ChainFromBlock ()→createChain() function to Create the channel chain structure object based on the creation block of the application channel. Used to manage ledger, channel configuration, and other resources to normally receive ledger blocks for channels.

Next, a transaction validator is created and a vsccValidatorImpl structure object is encapsulated to support the invocation of VSCC chain codes.

Then, create a ledger committer and define a callback function, EVEnter, that automatically updates the latest configured block object on the chain structure after the ledger is submitted.

Now enter createChain for analysis:

func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error{... vcs :=struct { // Construct a new validation chain code support object
		*chainSupport
		*semaphore.Weighted
		Support
	}{cs, validationWorkersSemaphore, GetSupport()}
	validator := txvalidator.NewTxValidator(vcs) // Create a transaction validator
	// Create the ledger submitter
	c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
		chainID, err := utils.GetChainIDFromBlock(block)
		iferr ! =nil {
			return err
		}
		return SetCurrConfigBlock(block, chainID)
	})
...
	// Create a transient privacy data store object
	store, err := transientStoreFactory.OpenStore(bundle.ConfigtxValidator().ChainID())
	...
	// Initialize the Gossip message module on the specified channel.
	// In the case of the master node, the block data is fetched from the Orderer service node. Otherwise, synchronize data from other nodes in the organization
	service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
		Validator: validator,
		Committer: c,
		Store:     store,
		Cs:        simpleCollectionStore,
	})

	chains.Lock()
	defer chains.Unlock()
	// Construct the new chain structure and insert the Peer node chain structure
	chains.list[cid] = &chain{
		cs:        cs, // Chain supports objects
		cb:        cb, // Configure the block
		committer: c,  // Account submitter}}Copy the code

Verify the validity of transaction data

Validation of market entry: the core committer/txvalidator/validator. Go/validateTx (), mainly for the following a few things

Parse the Envelope structure object that gets the transaction data

ifenv, err := utils.GetEnvelopeFromBlock(d); err ! =nil {}
Copy the code

② : Check whether the transaction format is correct, whether the signature is legal, whether the transaction content is tampered with

ifpayload, txResult = validation.ValidateTransaction(env, v.support.Capabilities()); txResult ! = peer.TxValidationCode_VALID { logger.Errorf("Invalid transaction with index %d", tIdx)
			results <- &blockValidationResult{
				tIdx:           tIdx,
				validationCode: txResult,
			}
			return
		}
Copy the code

③ : Parse and obtain the channel header

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
Copy the code

(4) : Check whether the channel chain structure exists

channel := chdr.ChannelId
if! v.chainExists(channel) {}Copy the code

⑤ : Process the message according to the Header type

5.1 General Trading messages

First get the ID data of the specified transaction from the ledger, check whether there is, then get the transaction read-write set, and check the legitimacy of the write set, call VSCC to verify the transaction endorsement strategy, then get the transaction chain code instance, and set the call chain code instance

txID = chdr.TxId
// Get the ID of the specified transaction from the ledger and check if it exists
if _, err := v.support.Ledger().GetTransactionByID(txID); err == nil{... }// Call VSCC to verify the transaction endorsement policy
err, cde := v.vscc.VSCCValidateTx(payload, d, env)
i..
// Get the transaction chain code instance
invokeCC, upgradeCC, err := v.getTxCCInstance(payload)
...
// Set the invocation chain code instance
txsChaincodeName = invokeCC
Copy the code

5.2 Configuring transaction messages on a Channel

The configuration transaction object is resolved and then the channel configuration is updated.

// Channel configuration transaction message parses to obtain configuration transaction object
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
...
// Update channel configuration
iferr := v.support.Apply(configEnvelope); err ! =nil{... }Copy the code

5.3 If it is a Peer resource update message, construct blockValidationResult and return

Serialization encapsulates transaction Envelope structure objects

if_, err := proto.Marshal(env); err ! =nil {
			logger.Warningf("Cannot marshal transaction: %s", err)
			results <- &blockValidationResult{
				tIdx:           tIdx,
				validationCode: peer.TxValidationCode_MARSHAL_TX_ERROR,
			}
			return
		}
Copy the code

⑦ : Finally through the transaction, based on the above parameters to construct the block verification result object

results <- &blockValidationResult{
			tIdx:                 tIdx,
			txsChaincodeName:     txsChaincodeName,
			txsUpgradedChaincode: txsUpgradedChaincode,
			validationCode:       peer.TxValidationCode_VALID,
			txid:                 txID,
		}
		return
Copy the code

Ledger tabler

Zhang to submit LedgerCommitter.Com mitWithPvtData () method is responsible for performing specific books. This method first calls the lc.preCommit(blockAndPvtData.block) method of the LedgerCommitter, preprocesses the Block data to be committed, and performs a custom lC. eventer(Block) callback for the configured Block. SetCurrConfigBlock() is called to retrieve chains. List [cid] from the local chain dictionary and update its latest configuration block. Then, call lc.PeerLedger.Com mitWithPvtData (blockAndPvtData) – > kvLedger.Com mitWithPvtData () method to submit data to the books, this is the core of zhang to submit work method. After the ledger is successfully submitted, the lc.postcommit (blockandPvtData.block) method is called to create Block events and filter Block events based on the Block, and the producer.send () method is executed to Send the two events to the event server to notify the subscribing client of the arrival of a new Block.

Enter the CommitWithPvtData() method:

func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) error {
	var err error
	block := pvtdataAndBlock.Block                 // Get the block object
	blockNo := pvtdataAndBlock.Block.Header.Number // Get the block number
	// Validate and prepare block and private data objects
	err = l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)...// Commit block and private data to ledger
	iferr = l.blockStore.CommitWithPvtData(pvtdataAndBlock); err ! =nil {
		return err
	}
	...	
	iferr = l.txtmgmt.Commit(); err ! =nil { // Update valid transaction data to the status database
		panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
	}
	if ledgerconfig.IsHistoryDBEnabled() {
		logger.Debugf("Channel [%s]: Committing block [%d] transactions to history database", l.ledgerID, blockNo)
		iferr := l.historyDB.Commit(block); err ! =nil { // Update block data to historical database
			panic(fmt.Errorf(`Error during commit to history db:%s`, err))
		}
	}
	return nil
}
Copy the code

This function does the following key things:

  • ValidateAndPrepare: Validate and prepare block and private data objects
  • CommitWithPvtData: Submit block and private data to ledger
  • txtmgmt.Commit(): Updates valid transaction data to the status database
  • l.historyDB.Commit(block): Updates block data to the historical database

The details of each feature are described below.

Validate and prepare block and private data objects

Function call:

err = l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)

-> batch, err := txmgr.validator.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)

func validateAndPreparePvtBatch(block *valinternal.Block, pvtdata map[uint64]*ledger.TxPvtData) (*privacyenabledstate.PvtUpdateBatch, error) {
	pvtUpdates := privacyenabledstate.NewPvtUpdateBatch()
	for _, tx := range block.Txs {
		iftx.ValidationCode ! = peer.TxValidationCode_VALID {continue
		}
		if! tx.ContainsPvtWrites() {continue
		}
		txPvtdata := pvtdata[uint64(tx.IndexInBlock)] // Get the private data of the specified transaction
		if txPvtdata == nil {                         // Skip transactions without private data
			continue
		}
		// Check to see if you need to validate private data
		if requiresPvtdataValidation(txPvtdata) {
			// Verify that the private data hashes match
			iferr := validatePvtdata(tx, txPvtdata); err ! =nil {
				return nil, err
			}
		}
		var pvtRWSet *rwsetutil.TxPvtRwSet
		var err error
		// Parse the private data write set
		ifpvtRWSet, err = rwsetutil.TxPvtRwSetFromProtoMsg(txPvtdata.WriteSet); err ! =nil {
			return nil, err
		}
		// Add to privacy data update batch operation
		addPvtRWSetToPvtUpdateBatch(pvtRWSet, pvtUpdates, version.NewHeight(block.Num, uint64(tx.IndexInBlock)))
	}
	return pvtUpdates, nil
}
Copy the code

First, the transaction list block.Txs of the current internal block is traversed. For each transaction object TX, the following three types of transactions need to be filtered out.

  • The transaction verification code is notTxValidationCode_VALIDInvalid transaction
  • There is no transaction in writing data hashes to private data
  • Transactions without private data

If the transaction passes the above checks, the validatePvtdata(tx, txPvtdata) method is called for the valid transaction TX and its private Data txPvtdata (txPVt-data type) to verify the correctness of the private Data hash. Since privacy data is generated by Endorser endorsement nodes, it is necessary to check whether the transmitted data has been tampered with. The general process is as follows:

func validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) error{...for _, nsPvtdata := range pvtdata.WriteSet.NsPvtRwset {
		for _, collPvtdata := range nsPvtdata.CollectionPvtRwset {
			// Compute private data hashes based on raw data
			collPvtdataHash := util.ComputeHash(collPvtdata.Rwset)
			// Get the hash value of the data in the transaction
			hashInPubdata := tx.RetrieveHash(nsPvtdata.Namespace, collPvtdata.CollectionName)
			// Compare private data hashes
			if !bytes.Equal(collPvtdataHash, hashInPubdata) {
				...
		}
	}
	return nil
}
Copy the code

Submit block and private data to ledger

A function call: l.blockStore.Com mitWithPvtData (pvtdataAndBlock)

core/ledger/ledgerstorage/store.go/CommitWithPvtData

func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error{...for _, v := range blockAndPvtdata.BlockPvtData {
		// Add private data to private data list pvtData
		pvtdata = append(pvtdata, v)
	}
	// Prepare to commit the private data list pvtData to the ledger, commit first and then confirm
	iferr := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata); err ! =nil {
		return err
	}
	// Commit the block to the ledger
	iferr := s.AddBlock(blockAndPvtdata.Block); err ! =nil {
		s.pvtdataStore.Rollback()
		return err
	}
	// Confirm the submission of private data
	return s.pvtdataStore.Commit()
}
Copy the code

Basically, they did the following things:

  1. Preparing to submit private data:Prepare
  2. Submit block data:s.AddBlock
  3. Confirm submission of private data:s.pvtdataStore.Commit()

① : Prepare to submit private data

Through the privacy data storage object call s.vtdatastore.prepare ()→ store.prepare () method, re-encode each privacy data object in the pvtData list and form KV key-value pair, add them to the batch operation of privacy database update on the account, and synchronize the update to the database. Finally, after confirmation of block data submission operation, confirm submission or rollback recovery of private data according to submission result status.

func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData) error {
	Prepare() should be false because the Commit and Rollback operations reset the flag bit
	if s.batchPending {
		return &ErrIllegalCall{`A pending batch exists as as result of last invoke to "Prepare" call. Invoke "Commit" or "Rollback" on the pending batch before invoking "Prepare" function`}}// Get the next block number
	expectedBlockNum := s.nextBlockNum()
	// Check the validity of the block number
	ifexpectedBlockNum ! = blockNum {return &ErrIllegalArgs{fmt.Sprintf("Expected block number=%d, recived block number=%d", expectedBlockNum, blockNum)}
	}
	// Create database update operation set Batch. Records all key keys whose data needs to be deleted or added
	batch := leveldbhelper.NewUpdateBatch()
	var key, value []byte
	var err error
	// Traverses the private data list to construct the private data KV key-value pair
	for _, txPvtData := range pvtData {
		// Traverses the private data list to construct the private data KV key-value pair
		key = encodePK(blockNum, txPvtData.SeqInBlock)
		ifvalue, err = encodePvtRwSet(txPvtData.WriteSet); err ! =nil {
			// Construct value: private data write collection
			return err
		}
		logger.Debugf("Adding private data to LevelDB batch for block [%d], tran [%d]", blockNum, txPvtData.SeqInBlock)
		// Add the private data key-value pair operation
		batch.Put(key, value)
	}
	// Add the operation of the pendingCommitKey pair
	batch.Put(pendingCommitKey, emptyValue)
	// Synchronously executes a set of database update operations
	if err := s.db.WriteBatch(batch, true); err ! =nil {
		return err
	}
	// Update the status flag bit
	s.batchPending = true
	logger.Debugf("Saved %d private data write sets for block [%d]".len(pvtData), blockNum)
	return nil
}
Copy the code

② : Submit block data

The s.adddblock (blockandpvtData.block) method is actually called blockFilemgr.addblock () through the Block file manager to submit the new Block blockandpvt-data.block to the Block data file. And save the new block checkpoint information, newCPInfo. Next, the indexBlock() method is called to create index information for the current block and index checkpoint information (current block number, etc.), which is updated to the block index database. Then call the Mgr.updatecheckpoint (newCPInfo) method to update the block checkpoint information on the block file manager, and execute the mgr.cpInfoCond.broadcast () method to Broadcast and wake up all programs waiting for the synchronization condition variable. Notifies that a new block has been submitted to the ledger. Finally, call the Mgr.updateblockchain-info () method to update the blockchain information, such as the latest block height, the latest block header hash value, etc.

③ : Confirm the submission of private data

Call the s.pvtdatastore.mit () method to confirm the commit of private data. Because the Prepare() method has updated all the private data key-value pairs into the database, this method actually deletes the pendingCommitKey pair on the private database and adds the lastCommittedBlkkey pair. To save the most recently submitted block number committingBlockNum. Finally, update the flag bits and variables related to the privacy data, set batchPending and isEmpty to false, and update lastCOMMITted-Block to committingBlockNum.

If committing block data fails, CommitWithPvtData() will perform a Rollback through the private dataStore object calling the s.pvt-datastore.rollback () method to restore the committed private data to the state before committing to the database.

Submit data to the state database

Entry: the core/gotten/kvledger/TXMGMT/TXMGR/lockbasedtxmgr/lockbased_txmgr. Go/Commit ()

func (txmgr *LockBasedTxMgr) Commit(a) error{...if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.batch,
		version.NewHeight(txmgr.currentBlock.Header.Number, uint64(len(txmgr.currentBlock.Data.Data)- 1)))... }}Copy the code
func (s *CommonStorageDB) ApplyPrivacyAwareUpdates(updates *UpdateBatch, height *version.Height) error{ addPvtUpdates(updates.PubUpdates, updates.PvtUpdates) addHashedUpdates(updates.PubUpdates, updates.HashUpdates, ! s.BytesKeySuppoted())return s.VersionedDB.ApplyUpdates(updates.PubUpdates.UpdateBatch, height)
}
Copy the code

It ends up in the following function:

core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go

func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
	dbBatch := leveldbhelper.NewUpdateBatch()
	namespaces := batch.GetUpdatedNamespaces()
	for _, ns := range namespaces {
		updates := batch.GetUpdates(ns)
		for k, vv := range updates {
			compositeKey := constructCompositeKey(ns, k)
			logger.Debugf("Channel [%s]: Applying key(string)=[%s] key(bytes)=[%#v]", vdb.dbName, string(compositeKey), compositeKey)

			if vv.Value == nil {
				dbBatch.Delete(compositeKey)
			} else {
				dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
			}
		}
	}
	dbBatch.Put(savePointKey, height.ToBytes())
	// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
	if err := vdb.db.WriteBatch(dbBatch, true); err ! =nil {
		return err
	}
	return nil
}
Copy the code

The ApplyUpdates() method does a few things:

  1. Call to iterate over the update batch operation for its containing key-value pairs (key k and value vv)ConstructCompositeKey (ns, k)Method to reconstruct a composite keycompositeKey
  2. Check the delete identifier for the key-value pair operation. If vv.Value is nil, the key-value pair update operation is a delete operation, and the call continuesdbBatch.Delete(compositeKey)Method to add the delete operation todbBatchIn the object. Otherwise, the block number and transaction sequence number in VV. Version are encoded and serialized into a byte arrayvv.ValueCombine into coded valuesencodedValue, and add its write operation to the dbBatch object
  3. calldbBatch.PutMethod to add savepoint identifiersKVKey/value pair. Where, the key is[]byte{0x00}, the value is versionheightAn encoded serialized array of bytes
  4. callvdb.db.WriteBatchMethod in atomic operation modedbBatchUpdates are synchronized to the status database. Note that the KV key-value pair is also reconstructed when writing to the database by prefixing the original key with the database name (chain ID/ ledger ID), i.e[]byte(dbName)+[]byte{0x00}To isolate state data on different channels.

Updating the historical database

Call the l.historydb.mit (block) method to update the Endorser endorsed valid transaction data in the block to the historical database as follows:

core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

func (historyDB *historyDB) Commit(block *common.Block) error {

	blockNo := block.Header.Number // Get the block number
	//Set the starting tranNo to 0
	var tranNo uint64.// Get the invalidation byte array for the block
	// Get a list of transaction captcha codes
	txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
	// Initialize txsFilter if it does not yet exist (e.g. during testing, for genesis block, etc)
	if len(txsFilter) == 0 {
		txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
		block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
	}

	// write each tran's write set to history db
	for _, envBytes := range block.Data.Data { // Iterate over all transaction data in block
		if txsFilter.IsInvalid(int(tranNo)) { // Filter out invalid transactions
			logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
				historyDB.dbName, tranNo)
			tranNo++
			continue
		}
		// Parse and get the transaction message Envelope structure object
		env, err := putils.GetEnvelopeFromBlock(envBytes)
		iferr ! =nil {
			return err
		}

	...
		// Check type: ordinary trade message endorsed by Endorser
		if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {

			// extract actions from the envelope message
			// Parse and extract chain code actions from transaction messages
			respPayload, err := putils.GetActionFromEnvelope(envBytes)
			iferr ! =nil {
				return err
			}
...
			// Parse the transaction read/write set into TxReadWriteSet structure objects
			iferr = txRWSet.FromProtoBytes(respPayload.Results); err ! =nil {
				return err
			}
			// Go through all read/write sets and reconstruct the KV key-value pair to add to the historical database
			for _, nsRWSet := range txRWSet.NsRwSets {
				ns := nsRWSet.NameSpace

				for _, kvWrite := range nsRWSet.KvRwSet.Writes {
					writeKey := kvWrite.Key

					// Create a composite key
					compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)

					// Write an empty array of bytes []byte{}
					dbBatch.Put(compositeHistoryKey, emptyValue)
				}
			}

		} else {
			// Skip the trade because the message is not a normal trade message endorsed by an Endorser
			logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
		}
		tranNo++
	}
	height := version.NewHeight(blockNo, tranNo) // Create a version object
	dbBatch.Put(savePointKey, height.ToBytes())  // Add savepoints for recovery

	// Update dbBatch to the historical database
	if err := historyDB.db.WriteBatch(dbBatch, true); err ! =nil {
		return err
	}
...
}
Copy the code

End here: end of ledger submission function analysis.

reference

Github.com/blockchainG…