Start screwing hyperledger fabric source | Peer node
Article and code: github.com/blockchainG…
Branches: v1.1.0
Startup Process Overview
Entry: peer/main. Go:
The main() function is responsible for initializing the peer main command object, registering the subcommand and initializing the environment configuration, parsing the user input subcommand start and starting the peer node, including the following process steps:
- Define and register commands and initialize basic configurations. Based on the
Cobra
Component defines the peer main command objectmainCmd
, and through theViper
Component callsInitConfig
() function from localcore.yaml
Configuration files, environment variables, command-line options, and so on are read and parsedpeer
Command configuration. At the same time, initialize the main commandmainCmd
Flag bit option ofversion
,logging-level
Wait, then in the master commandmainCmd
Registered onVersion, Node, ChainCode, channel
To set the maximum available valueCPU
Kernel and log backend; - Initializing the local
MSP
Components. throughViper
Component to getMSP
Component configuration file pathmspMgrConfigDir
,BCCSP
Configuration itemsbccspConfig
, MSP name ID localMSPID,MSP
Component typelocalMSPType
Etc., based on these four parameters to construct localMSP
Configure the object, and then create the defaultbccspmsp
Structure object as localMSP
Component, and parseMSP
Configure objects and initialize the localMSP
Components; - Execute the maincmd.execute () method to start
Peer
node
These key parts will be discussed in detail in the following sections.
Define and register commands and initialize basic configurations
Defining the master command
Code analysis is as follows:
var mainCmd = &cobra.Command{ // Build the main command based on Cobra components
Use: "peer".// Define the command usage method
//// defines the execution function
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
// Check the CORE_LOGGING_LEVEL environment variable to override all other logging Settings. Otherwise, use the configuration values in the core.yaml file
var loggingSpec string
if viper.GetString("logging_level") != "" {
loggingSpec = viper.GetString("logging_level") // Get the log level in the configuration file
} else {
loggingSpec = viper.GetString("logging.level") // Get the log level in the configuration file
}
flogging.InitFromSpec(loggingSpec) // Initialize the logger based on the configured log level
return nil
},
Run: func(cmd *cobra.Command, args []string) { // Define the execution function
if versionFlag {
fmt.Print(version.GetInfo()) // Prints the version of the peer program
} else {
cmd.HelpFunc()(cmd, args) // Directly prints the help information about the command}},Copy the code
Register subcommands
Register several types of subcommands with the main command. The types are as follows:
- Channel channel subcommands are used to create application channels, obtain blocks, add Peer nodes to application channels, obtain the list of application channels to which the Peer node is added, update application channel configurations, configure signature transaction files, and obtain information about the specified application channels
Create, FETCH, Join, list, update, SignConfigTX, getInfo
Equal subcommand; - Chaincode subcommands: used to install chain codes, instantiate (deploy) chain codes, invoke chain codes, package chain codes, query chain codes, signature chaincode packages, upgrade chain codes, obtain channel chaincode list, etc., including
Install, Instantiate, Invoke, Package, Query, SignPackage, Upgrade, list
Equal subcommand; - Node node commands: used to manage node service processes and query service status, including
The start, the status,
Equal subcommand; - Logging Subcommand: Used to obtain, set, and restore log level functions, including
Getlevel, setLevel, revertLevels
Equal subcommand; - Version Version subcommand: Used to print information
Fabric
In thePeer
Node server version information.
viper.SetEnvPrefix(cmdRoot) // Set the environment variable prefix core
viper.AutomaticEnv() // Find matching environment variables
replacer := strings.NewReplacer("."."_") // Create a replacement
viper.SetEnvKeyReplacer(replacer) // Set the environment variable substitution
// Define a set of command-line options that are valid for all peers and their subcommands
mainFlags := mainCmd.PersistentFlags()
// Set the binding of version and logging-level options
mainFlags.BoolVarP(&versionFlag, "version"."v".false."Display current version of fabric peer server")
mainFlags.String("logging-level".""."Default logging level and overrides, see core.yaml for full syntax")
// Viper configures the binding command line option
viper.BindPFlag("logging_level", mainFlags.Lookup("logging-level"))
// Register subcommands
mainCmd.AddCommand(version.Cmd()) // version subcommand
mainCmd.AddCommand(node.Cmd()) // node subcommands start and status
mainCmd.AddCommand(chaincode.Cmd(nil)) // Chaincode subcommand install etc
mainCmd.AddCommand(clilogging.Cmd(nil)) // cli sub command getlevel
mainCmd.AddCommand(channel.Cmd(nil)) // channel subcommand create, etc
// Load the configuration file core.yaml
err := common.InitConfig(cmdRoot)
...
runtime.GOMAXPROCS(viper.GetInt("peer.gomaxprocs")) // Set the maximum number of available CPU cores
// setup system-wide logging backend based on settings from core.yaml
// Initialize the system log backend
flogging.InitBackend(flogging.SetFormat(viper.GetString("logging.format")), logOutput)
Copy the code
Initialize the local MSP component
MSP component is an important security module for managing local membership. It encapsulates the root CA certificate and local signer entity.
// Initialize the local MSP component object
var mspMgrConfigDir = config.GetPath("peer.mspConfigPath") // Obtain the path of the MSP configuration file
var mspID = viper.GetString("peer.localMspId") // Get the local MSP name
var mspType = viper.GetString("peer.localMspType") // Get the local MSP component type
if mspType == "" {
// The MSP component type is FABRIC by default
mspType = msp.ProviderTypeToString(msp.FABRIC)
}
// Obtain the configuration information of the BCCSP component and initialize the MSP component object
err = common.InitCrypto(mspMgrConfigDir, mspID, mspType)
Copy the code
Execute the master command
The function is as follows:
ifmainCmd.Execute() ! =nil {}
Copy the code
Next to the Execute () function to analysis: / vendor/github.com/spf13/cobra/command.go/ExecuteC ()
func (c *Command) ExecuteC(a) (cmd *Command, err error) {}
Copy the code
Call the Execute() method through Cobra and run the peer node start command to start the peer node. The Cobra component parses the command-line options entered by the user and then executes all the relevant execution methods defined in the nodeStartCmd object, in the following order as defined in the cobra.Com command
- / PersistentPreRun PersistentPreRunE () ();
- / PreRun PreRunE () ();
- RunE/Run () ();
- / PostRun PostRunE () ();
- PersistentPostRunE()/PersistentPostRun();
So far, the node command starts execution, because this part is mainly about node startup, so the following focuses on the running process of node startup command execution.
The node starts the command execution
The command to start a node can be found in the following code path:
mainCmd.AddCommand(node.Cmd())
Copy the code
func Cmd(a) *cobra.Command {
nodeCmd.AddCommand(startCmd())
nodeCmd.AddCommand(statusCmd())
return nodeCmd
}
Copy the code
Only node start commands are discussed here
func startCmd(a) *cobra.Command{...return nodeStartCmd
}
Copy the code
var nodeStartCmd = &cobra.Command{
...
return serve(args)
},
}
Copy the code
Enter the serve function discussion:
Initializing resources
① Obtain the local MSP component type and check the MSP component type
Currently, Hyperledger Fabric supports Fabric MSP and IDEMIX MSP. By default, the Fabric MSP based on BCCSP is used.
mspType := mgmt.GetLocalMSP().GetType() // Get the local MSP component type
ifmspType ! = msp.FABRIC {// Check the MSP component type
panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
}
Copy the code
② : Initializes the resource access policy provider
aclmgmt.RegisterACLProvider(nil)
Copy the code
③ : Initialize the local ledger manager
ledgermgmt.Initialize(peer.ConfigTxProcessors)
Copy the code
core/ledger/ledgermgmt/ledger_mgmt.go/initialize
func initialize(customTxProcessors customtx.Processors) {
logger.Info("Initializing ledger mgmt")
lock.Lock()
defer lock.Unlock() // Set the initialization flag of the Peer node to true
initialized = true
// Create an open ledger dictionary openedLedgers
openedLedgers = make(map[string]ledger.PeerLedger)
// Initialize the transaction message handler dictionary to the global variable Processors dictionary
customtx.Initialize(customTxProcessors)
cceventmgmt.Initialize() // Initializes the chain event manager
provider, err := kvledger.NewProvider() // Create a local Peer node ledger provider
iferr ! =nil {
panic(fmt.Errorf("Error in instantiating ledger provider: %s", err))
}
provider.Initialize(kvLedgerStateListeners) // Initialize the status listener
ledgerProvider = provider // Set to the global default Peer node ledger provider
logger.Info("ledger mgmt initialized")}Copy the code
The original providers are as follows:
-
Ledger ID database (idStore type) : LevelDB database that stores ledger ID (chain ID) and key value pairs of Genesis block;
-
Ledgerstorage. Provider: creates ledger data storage objects and manages block data files, privacy databases, block index databases, etc.
-
History database provider (HistoryDBProvider type) : Creates a history database that stores historical information about each state data;
-
Stateful Database provider (CommonStorageDBProvider type) : Creates a stateful database (LevelDB or CouchDB type) that stores the world state, including public and private data for valid transactions.
④ : Initializes server parameters
if chaincodeDevMode {
// Set the chain mode
viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
}
// Read the configuration and cache the Peer node address and endpoint
iferr := peer.CacheConfiguration(); err ! =nil {
return err
}
// Get the cached Peer endpoint
peerEndpoint, err := peer.GetPeerEndpoint()
...
var peerHost string
// Obtain the IP address of the Peer node. Note that the IP address and port have been separated
peerHost, _, err = net.SplitHostPort(peerEndpoint.Address)
Copy the code
Create a GRPC server
1: Create a gRPC server
The serve() function creates at least three gRPC servers (separate ports) for registering the Peer node function server, as shown below:
The serial number | port | Functional server | instructions | The service interface |
---|---|---|---|---|
1 | 7051 | DeliverEvents Event server | Process block request messages | Deliver() |
7051 | The Admin server | Obtain node status and maintenance logs | GetStatus() | |
7051 | Endorser Endorser server | Offer endorsement service | ProcessProposal() | |
7051 | Gossip message server | Data distribution and synchronization status of nodes within an organization | GossipStream() | |
2 | 7052 | ChaincodeSupport Support server | Provides the link codes of Peer nodes | Register() |
3 | 7053 | EventHub Event server | Provide subscription event service (deprecated in 1.3.0) | Chat() |
serverConfig, err := peer.GetServerConfig()
...
peerServer, err := peer.CreatePeerServer(listenAddr, serverConfig)
Copy the code
② : Create the EventHub event server
if serverConfig.SecOpts.UseTLS {
...
cs := comm.GetCredentialSupport() // Create the CredentialSupport structure object
cs.ServerRootCAs = serverConfig.SecOpts.ServerRootCAs
//// Obtain the gRPC client certificate for TLS connection authentication
clientCert, err := peer.GetClientCertificate()
// Set the client certificate
comm.GetCredentialSupport().SetClientCertificate(clientCert)
}
//// Create event EventHub server (port 7053)
ehubGrpcServer, err := createEventHubServer(serverConfig)
Copy the code
③ create the DeliverEvents event server
The serve() function checks if two-way TLS security authentication is enabled, the serve() function sets the mutualTLS flag bit to true and defines the policyCheckerProvider() function to obtain the resource policy checker. This function directly calls the CheckACL() method of the global variable aclProvider object to check whether the signed message meets the access control policy for the specified resource on the channelID.
Then serve () function call peer. NewDeliverEventsServer () function, based on the parameters such as mutualTLS, policy – CheckerProvider create server abServer DeliverEvents events, Deliver() and DeliverFiltered() service interfaces are provided for processing messages requesting normal blocks and filtered blocks, respectively.
The pb.registerDeliverServer () method is then called to register the DeliverEvents event server abServer with the default gRPC server (port 7051) to provide local event services.
//// Check whether two-way TLS security authentication is enabled
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
//// Defines the function for checking resource access permission policies
policyCheckerProvider := func(resourceName string) deliver.PolicyChecker {
return func(env *cb.Envelope, channelID string) error {
return aclmgmt.GetACLProvider().CheckACL(resourceName, channelID, env)
}
}
// Create DeliverEvents event server and register with Peer gRPC server (port 7051)
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverSupportManager{})
pb.RegisterDeliverServer(peerServer.Server(), abServer)
Copy the code
④ Create a ChaincodeSupport support server
// Create a dedicated gRPC server and ChaincodeSupport (dedicated port or 7052 port)
ccSrv, ccEndpoint, err := createChaincodeServer(ca, peerHost)
iferr ! =nil {
logger.Panicf("Failed to create chaincode server: %s", err)
}
// Register the ChaincodeSupoort object with the Peer gRPC server
// Register system chain code at the same time to support deployment call system chain code
registerChaincodeSupport(ccSrv, ccEndpoint, ca)
go ccSrv.Start() // Start the gRPC server to provide chain support services
Copy the code
⑤ : Create Admin management server and Endorser endorsement server
// Create Admin server and Admin server
pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer())
// Define the Gossip protocol to distribute private data functions
privDataDist := func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
return service.GetGossipService().DistributePrivateData(channel, txID, privateData)
}
// Create an EndorserServer EndorserServer
serverEndorser := endorser.NewEndorserServer(privDataDist, &endorser.SupportImpl{})
libConf := library.Config{}
if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err ! =nil {
return errors.WithMessage(err, "could not load YAML config")}//// Create a list of message filters
authFilters := library.InitRegistry(libConf).Lookup(library.Auth).([]authHandler.Filter)
// Construct all message filters as message Filter chains and return the first Filter (Filter type, implementing EndorserServer // interface)
auth := authHandler.ChainFilters(serverEndorser, authFilters...)
// Register the Endorser server
//// Register the EndorserServer to the gRPC server
pb.RegisterEndorserServer(peerServer.Server(), auth)
Copy the code
⑥ : Create the Gossip message server
// Get the initial node address list for the Bootstrap connection. The default is 127.0.0.1:7051
bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")
//// Gets the local MSP signer identity entity and serializes it
serializedIdentity, err := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
...
messageCryptoService := peergossip.NewMCS( Construct the Gossip message encryption service component
peer.NewChannelPolicyManagerGetter(), // The channel Policy Manager gets the component
localmsp.NewSigner(), // Local signers
mgmt.NewDeserializersManager()) // Deserialize the identity component manager
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
// callback function for secure dial options for gossip service
// Define the Gossip server callback function to create gRPC dialup connection options for the Gossip server security configuration
secureDialOpts := func(a) []grpc.DialOption {
var dialOpts []grpc.DialOption
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) // Set the maximum number of bytes for sending and receiving messages
// set the keepalive options
kaOpts := comm.DefaultKeepaliveOptions() // Get the default keepalive option for heartbeat messages.// Set the keepalive option for heartbeat communication in gRPC dialup connection options
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(kaOpts)...)
if comm.TLSEnabled() { // Enable TLS security authentication and set the CLIENT TLS communication certificate
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCredentialSupport().GetPeerCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure()) // Otherwise, disable TLS security authentication
}
return dialOpts
}
// Check whether TLS security authentication is enabled on the gRPC server and obtain and set the identity certificates of the server and client
var certs *common2.TLSCertificates
if peerServer.TLSEnabled() {
serverCert := peerServer.ServerCertificate()
clientCert, err := peer.GetClientCertificate()
iferr ! =nil {
return errors.Wrap(err, "failed obtaining client certificates")
}
certs = &common2.TLSCertificates{}
certs.TLSServerCert.Store(&serverCert)
certs.TLSClientCert.Store(&clientCert)
}
// Create the Gossip message server instance gossipServiceInstance
err = service.InitGossipService(serializedIdentity, peerEndpoint.Address, peerServer.Server(), certs,
messageCryptoService, secAdv, secureDialOpts, bootstrap...)
Copy the code
Deploys the system chain code and initializes the chain structure of existing channels
① : Deploy the system chain code
// Deploy the system chain code
initSysCCs()
Copy the code
② : Initializes the chain structure on the existing channel
// Initializes the chain structure on the existing channel
peer.Initialize(func(cid string){})Copy the code
core/peer/peer.go/Initialize
func Initialize(init func(string)) {
nWorkers := viper.GetInt("peer.validatorPoolSize") // Get the number of transaction validation threads
if nWorkers <= 0 {
nWorkers = runtime.NumCPU()
}
//// Sets the number of concurrent semaphore accesses
validationWorkersSemaphore = semaphore.NewWeighted(int64(nWorkers))
chainInitializer = init // Set the initialization function
var cb *common.Block
var ledger ledger.PeerLedger
//// Initialize the ledger manager
ledgermgmt.Initialize(ConfigTxProcessors)
//// gets a list of account ids under the current account manager
ledgerIds, err := ledgermgmt.GetLedgerIDs()
iferr ! =nil {
panic(fmt.Errorf("Error in initializing ledgermgmt: %s", err))
}
for _, cid := range ledgerIds {
peerLogger.Infof("Loading chain %s", cid)
// Create a local Peer account
ifledger, err = ledgermgmt.OpenLedger(cid); err ! =nil {
peerLogger.Warningf("Failed to load ledger %s(%s)", cid, err)
peerLogger.Debugf("Error while loading ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
continue
}
//// gets the latest configuration block from the specified channel ledger
ifcb, err = getCurrConfigBlockFromLedger(ledger); err ! =nil {
peerLogger.Warningf("Failed to find config block on ledger %s(%s)", cid, err)
peerLogger.Debugf("Error while looking for config block on ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
continue
}
// Create a chain if we get a valid ledger with config block
//// Creates a chain structure for the specified channel on the Peer node
iferr = createChain(cid, ledger, cb); err ! =nil {
peerLogger.Warningf("Failed to load chain %s(%s)", cid, err)
peerLogger.Debugf("Error reloading chain %s with message %s. We continue to the next chain rather than abort.", cid, err)
continue
}
// Initialize the channel chain structure with custom functions, such as deployment system chain code
InitChain(cid)
}
}
Copy the code
Start the gRPC server and profile server
// Establish a channel for passing error messages
serve := make(chan error)
// Signal channel
sigs := make(chan os.Signal, 1)
// Set the process signal channel notification signal, including interrupt/terminate signal
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func(a) {// Sets a specific notification signal for this process to block waiting
sig := <-sigs // Read signal value from sigS channel, block wait mode
logger.Debugf("sig: %s", sig)
serve <- nil} ()// Use goroutine to start gRPC server (port 7051, registered Admin server, Endorser server, etc.)
go func(a) {
var grpcErr error
ifgrpcErr = peerServer.Start(); grpcErr ! =nil { // Listening port (7051) provides services
grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
} else {
logger.Info("peer server exited")
}
serve <- grpcErr // If you exit because of an error, send an error to the serve channel} ()// Write the running process ID to the process file
if err := writePid(config.GetPath("peer.fileSystemPath") +"/peer.pid", os.Getpid()); err ! =nil {
return err
}
// Start the event hub server
// Start gRPC server based on dedicated event listening port (port 7053, registered EventHub event server)
ifehubGrpcServer ! =nil {
go ehubGrpcServer.Start()
}
// Start profiling http endpoint if enabled
// If the profile enable flag is turned on, the provisioning service is started
if viper.GetBool("peer.profile.enabled") {
go func(a) { // Start the GO Profile server. If there is an error, no error message will be sent
// Get the profile listening address
profileListenAddress := viper.GetString("peer.profile.listenAddress")
logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr ! =nil {
logger.Errorf("Error starting profiler: %s", profileErr)
}
}()
}
Copy the code
At this point, the Peer node and its function server have been started.
reference
Github.com/blockchainG…