Start screwing hyperledger fabric source | Peer node

Article and code: github.com/blockchainG…

Branches: v1.1.0

Startup Process Overview

Entry: peer/main. Go:

The main() function is responsible for initializing the peer main command object, registering the subcommand and initializing the environment configuration, parsing the user input subcommand start and starting the peer node, including the following process steps:

  • Define and register commands and initialize basic configurations. Based on theCobraComponent defines the peer main command objectmainCmd, and through theViperComponent callsInitConfig() function from localcore.yamlConfiguration files, environment variables, command-line options, and so on are read and parsedpeerCommand configuration. At the same time, initialize the main commandmainCmdFlag bit option ofversion,logging-levelWait, then in the master commandmainCmdRegistered onVersion, Node, ChainCode, channelTo set the maximum available valueCPUKernel and log backend;
  • Initializing the localMSPComponents. throughViperComponent to getMSPComponent configuration file pathmspMgrConfigDir,BCCSPConfiguration itemsbccspConfig, MSP name ID localMSPID,MSPComponent typelocalMSPTypeEtc., based on these four parameters to construct localMSPConfigure the object, and then create the defaultbccspmspStructure object as localMSPComponent, and parseMSPConfigure objects and initialize the localMSPComponents;
  • Execute the maincmd.execute () method to startPeernode

These key parts will be discussed in detail in the following sections.

Define and register commands and initialize basic configurations

Defining the master command

Code analysis is as follows:

var mainCmd = &cobra.Command{ // Build the main command based on Cobra components
	Use: "peer".// Define the command usage method
	//// defines the execution function
	PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
		// Check the CORE_LOGGING_LEVEL environment variable to override all other logging Settings. Otherwise, use the configuration values in the core.yaml file
		var loggingSpec string
		if viper.GetString("logging_level") != "" {
			loggingSpec = viper.GetString("logging_level") // Get the log level in the configuration file
		} else {
			loggingSpec = viper.GetString("logging.level") // Get the log level in the configuration file
		}
		flogging.InitFromSpec(loggingSpec) // Initialize the logger based on the configured log level
		return nil
	},
	Run: func(cmd *cobra.Command, args []string) { // Define the execution function
		if versionFlag {
			fmt.Print(version.GetInfo()) // Prints the version of the peer program
		} else {
			cmd.HelpFunc()(cmd, args) // Directly prints the help information about the command}},Copy the code

Register subcommands

Register several types of subcommands with the main command. The types are as follows:

  • Channel channel subcommands are used to create application channels, obtain blocks, add Peer nodes to application channels, obtain the list of application channels to which the Peer node is added, update application channel configurations, configure signature transaction files, and obtain information about the specified application channelsCreate, FETCH, Join, list, update, SignConfigTX, getInfoEqual subcommand;
  • Chaincode subcommands: used to install chain codes, instantiate (deploy) chain codes, invoke chain codes, package chain codes, query chain codes, signature chaincode packages, upgrade chain codes, obtain channel chaincode list, etc., includingInstall, Instantiate, Invoke, Package, Query, SignPackage, Upgrade, listEqual subcommand;
  • Node node commands: used to manage node service processes and query service status, includingThe start, the status,Equal subcommand;
  • Logging Subcommand: Used to obtain, set, and restore log level functions, includingGetlevel, setLevel, revertLevelsEqual subcommand;
  • Version Version subcommand: Used to print informationFabricIn thePeerNode server version information.
viper.SetEnvPrefix(cmdRoot)               // Set the environment variable prefix core
	viper.AutomaticEnv()                      // Find matching environment variables
	replacer := strings.NewReplacer("."."_") // Create a replacement
	viper.SetEnvKeyReplacer(replacer)         // Set the environment variable substitution
	// Define a set of command-line options that are valid for all peers and their subcommands
	mainFlags := mainCmd.PersistentFlags()
	// Set the binding of version and logging-level options
	mainFlags.BoolVarP(&versionFlag, "version"."v".false."Display current version of fabric peer server")
	mainFlags.String("logging-level".""."Default logging level and overrides, see core.yaml for full syntax")
	// Viper configures the binding command line option
	viper.BindPFlag("logging_level", mainFlags.Lookup("logging-level"))
	// Register subcommands
	mainCmd.AddCommand(version.Cmd())       // version subcommand
	mainCmd.AddCommand(node.Cmd())          // node subcommands start and status
	mainCmd.AddCommand(chaincode.Cmd(nil))  // Chaincode subcommand install etc
	mainCmd.AddCommand(clilogging.Cmd(nil)) // cli sub command getlevel
	mainCmd.AddCommand(channel.Cmd(nil))    // channel subcommand create, etc
	// Load the configuration file core.yaml
	err := common.InitConfig(cmdRoot)
	...
	runtime.GOMAXPROCS(viper.GetInt("peer.gomaxprocs")) // Set the maximum number of available CPU cores
	// setup system-wide logging backend based on settings from core.yaml
	// Initialize the system log backend
	flogging.InitBackend(flogging.SetFormat(viper.GetString("logging.format")), logOutput)
Copy the code

Initialize the local MSP component

MSP component is an important security module for managing local membership. It encapsulates the root CA certificate and local signer entity.

// Initialize the local MSP component object
	var mspMgrConfigDir = config.GetPath("peer.mspConfigPath") // Obtain the path of the MSP configuration file
	var mspID = viper.GetString("peer.localMspId")             // Get the local MSP name
	var mspType = viper.GetString("peer.localMspType")         // Get the local MSP component type
	if mspType == "" {
		// The MSP component type is FABRIC by default
		mspType = msp.ProviderTypeToString(msp.FABRIC)
	}
	// Obtain the configuration information of the BCCSP component and initialize the MSP component object
	err = common.InitCrypto(mspMgrConfigDir, mspID, mspType)
Copy the code

Execute the master command

The function is as follows:

ifmainCmd.Execute() ! =nil {}
Copy the code

Next to the Execute () function to analysis: / vendor/github.com/spf13/cobra/command.go/ExecuteC ()

func (c *Command) ExecuteC(a) (cmd *Command, err error) {}
Copy the code

Call the Execute() method through Cobra and run the peer node start command to start the peer node. The Cobra component parses the command-line options entered by the user and then executes all the relevant execution methods defined in the nodeStartCmd object, in the following order as defined in the cobra.Com command

  • / PersistentPreRun PersistentPreRunE () ();
  • / PreRun PreRunE () ();
  • RunE/Run () ();
  • / PostRun PostRunE () ();
  • PersistentPostRunE()/PersistentPostRun();

So far, the node command starts execution, because this part is mainly about node startup, so the following focuses on the running process of node startup command execution.

The node starts the command execution

The command to start a node can be found in the following code path:

mainCmd.AddCommand(node.Cmd()) 
Copy the code
func Cmd(a) *cobra.Command {
	nodeCmd.AddCommand(startCmd())
	nodeCmd.AddCommand(statusCmd())
	return nodeCmd
}
Copy the code

Only node start commands are discussed here

func startCmd(a) *cobra.Command{...return nodeStartCmd
}
Copy the code
var nodeStartCmd = &cobra.Command{
...
		return serve(args)
	},
}
Copy the code

Enter the serve function discussion:

Initializing resources

① Obtain the local MSP component type and check the MSP component type

Currently, Hyperledger Fabric supports Fabric MSP and IDEMIX MSP. By default, the Fabric MSP based on BCCSP is used.

mspType := mgmt.GetLocalMSP().GetType() // Get the local MSP component type
	ifmspType ! = msp.FABRIC {// Check the MSP component type
		panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
	}
Copy the code

② : Initializes the resource access policy provider

aclmgmt.RegisterACLProvider(nil)
Copy the code

③ : Initialize the local ledger manager

ledgermgmt.Initialize(peer.ConfigTxProcessors) 	
Copy the code

core/ledger/ledgermgmt/ledger_mgmt.go/initialize

func initialize(customTxProcessors customtx.Processors) {
	logger.Info("Initializing ledger mgmt")
	lock.Lock()
	defer lock.Unlock() // Set the initialization flag of the Peer node to true
	initialized = true
	// Create an open ledger dictionary openedLedgers
	openedLedgers = make(map[string]ledger.PeerLedger)
	// Initialize the transaction message handler dictionary to the global variable Processors dictionary
	customtx.Initialize(customTxProcessors)
	cceventmgmt.Initialize()                // Initializes the chain event manager
	provider, err := kvledger.NewProvider() // Create a local Peer node ledger provider
	iferr ! =nil {
		panic(fmt.Errorf("Error in instantiating ledger provider: %s", err))
	}
	provider.Initialize(kvLedgerStateListeners) // Initialize the status listener
	ledgerProvider = provider                   // Set to the global default Peer node ledger provider
	logger.Info("ledger mgmt initialized")}Copy the code

The original providers are as follows:

  • Ledger ID database (idStore type) : LevelDB database that stores ledger ID (chain ID) and key value pairs of Genesis block;

  • Ledgerstorage. Provider: creates ledger data storage objects and manages block data files, privacy databases, block index databases, etc.

  • History database provider (HistoryDBProvider type) : Creates a history database that stores historical information about each state data;

  • Stateful Database provider (CommonStorageDBProvider type) : Creates a stateful database (LevelDB or CouchDB type) that stores the world state, including public and private data for valid transactions.

④ : Initializes server parameters

if chaincodeDevMode {
		// Set the chain mode
		viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)

	}
	// Read the configuration and cache the Peer node address and endpoint
	iferr := peer.CacheConfiguration(); err ! =nil {
		return err
	}
	// Get the cached Peer endpoint
	peerEndpoint, err := peer.GetPeerEndpoint()
	...
	var peerHost string
	// Obtain the IP address of the Peer node. Note that the IP address and port have been separated
	peerHost, _, err = net.SplitHostPort(peerEndpoint.Address)
Copy the code

Create a GRPC server

1: Create a gRPC server

The serve() function creates at least three gRPC servers (separate ports) for registering the Peer node function server, as shown below:

The serial number port Functional server instructions The service interface
1 7051 DeliverEvents Event server Process block request messages Deliver()
7051 The Admin server Obtain node status and maintenance logs GetStatus()
7051 Endorser Endorser server Offer endorsement service ProcessProposal()
7051 Gossip message server Data distribution and synchronization status of nodes within an organization GossipStream()
2 7052 ChaincodeSupport Support server Provides the link codes of Peer nodes Register()
3 7053 EventHub Event server Provide subscription event service (deprecated in 1.3.0) Chat()
serverConfig, err := peer.GetServerConfig()
	...
peerServer, err := peer.CreatePeerServer(listenAddr, serverConfig)
Copy the code

② : Create the EventHub event server

	if serverConfig.SecOpts.UseTLS {
		...
		cs := comm.GetCredentialSupport() // Create the CredentialSupport structure object
		cs.ServerRootCAs = serverConfig.SecOpts.ServerRootCAs
		//// Obtain the gRPC client certificate for TLS connection authentication
		clientCert, err := peer.GetClientCertificate()
		// Set the client certificate
		comm.GetCredentialSupport().SetClientCertificate(clientCert)
	}
	//// Create event EventHub server (port 7053)
	ehubGrpcServer, err := createEventHubServer(serverConfig)
Copy the code

③ create the DeliverEvents event server

The serve() function checks if two-way TLS security authentication is enabled, the serve() function sets the mutualTLS flag bit to true and defines the policyCheckerProvider() function to obtain the resource policy checker. This function directly calls the CheckACL() method of the global variable aclProvider object to check whether the signed message meets the access control policy for the specified resource on the channelID.

Then serve () function call peer. NewDeliverEventsServer () function, based on the parameters such as mutualTLS, policy – CheckerProvider create server abServer DeliverEvents events, Deliver() and DeliverFiltered() service interfaces are provided for processing messages requesting normal blocks and filtered blocks, respectively.

The pb.registerDeliverServer () method is then called to register the DeliverEvents event server abServer with the default gRPC server (port 7051) to provide local event services.

//// Check whether two-way TLS security authentication is enabled
	mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
	//// Defines the function for checking resource access permission policies
	policyCheckerProvider := func(resourceName string) deliver.PolicyChecker {
		return func(env *cb.Envelope, channelID string) error {
			return aclmgmt.GetACLProvider().CheckACL(resourceName, channelID, env)
		}
	}
	// Create DeliverEvents event server and register with Peer gRPC server (port 7051)
	abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverSupportManager{})
	pb.RegisterDeliverServer(peerServer.Server(), abServer)
Copy the code

④ Create a ChaincodeSupport support server

// Create a dedicated gRPC server and ChaincodeSupport (dedicated port or 7052 port)
	ccSrv, ccEndpoint, err := createChaincodeServer(ca, peerHost)
	iferr ! =nil {
		logger.Panicf("Failed to create chaincode server: %s", err)
	}
	// Register the ChaincodeSupoort object with the Peer gRPC server
	// Register system chain code at the same time to support deployment call system chain code
	registerChaincodeSupport(ccSrv, ccEndpoint, ca)
	go ccSrv.Start() // Start the gRPC server to provide chain support services
Copy the code

⑤ : Create Admin management server and Endorser endorsement server

	// Create Admin server and Admin server
	pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer())
	// Define the Gossip protocol to distribute private data functions
	privDataDist := func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
		return service.GetGossipService().DistributePrivateData(channel, txID, privateData)
	}
	// Create an EndorserServer EndorserServer
	serverEndorser := endorser.NewEndorserServer(privDataDist, &endorser.SupportImpl{})
	libConf := library.Config{}
	if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err ! =nil {
		return errors.WithMessage(err, "could not load YAML config")}//// Create a list of message filters
	authFilters := library.InitRegistry(libConf).Lookup(library.Auth).([]authHandler.Filter)
	// Construct all message filters as message Filter chains and return the first Filter (Filter type, implementing EndorserServer // interface)
	auth := authHandler.ChainFilters(serverEndorser, authFilters...)
	// Register the Endorser server
	//// Register the EndorserServer to the gRPC server
	pb.RegisterEndorserServer(peerServer.Server(), auth)
Copy the code

⑥ : Create the Gossip message server

	// Get the initial node address list for the Bootstrap connection. The default is 127.0.0.1:7051
	bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")

	//// Gets the local MSP signer identity entity and serializes it
	serializedIdentity, err := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
	...
	messageCryptoService := peergossip.NewMCS( Construct the Gossip message encryption service component
		peer.NewChannelPolicyManagerGetter(), // The channel Policy Manager gets the component
		localmsp.NewSigner(),                 // Local signers
		mgmt.NewDeserializersManager())       // Deserialize the identity component manager
	secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())

	// callback function for secure dial options for gossip service
	// Define the Gossip server callback function to create gRPC dialup connection options for the Gossip server security configuration
	secureDialOpts := func(a) []grpc.DialOption {
		var dialOpts []grpc.DialOption
		// set max send/recv msg sizes
		dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
			grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) // Set the maximum number of bytes for sending and receiving messages
		// set the keepalive options
		kaOpts := comm.DefaultKeepaliveOptions() // Get the default keepalive option for heartbeat messages.// Set the keepalive option for heartbeat communication in gRPC dialup connection options
		dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(kaOpts)...)

		if comm.TLSEnabled() { // Enable TLS security authentication and set the CLIENT TLS communication certificate
			dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCredentialSupport().GetPeerCredentials()))
		} else {
			dialOpts = append(dialOpts, grpc.WithInsecure()) // Otherwise, disable TLS security authentication
		}
		return dialOpts
	}

	// Check whether TLS security authentication is enabled on the gRPC server and obtain and set the identity certificates of the server and client
	var certs *common2.TLSCertificates
	if peerServer.TLSEnabled() {
		serverCert := peerServer.ServerCertificate()
		clientCert, err := peer.GetClientCertificate()
		iferr ! =nil {
			return errors.Wrap(err, "failed obtaining client certificates")
		}
		certs = &common2.TLSCertificates{}
		certs.TLSServerCert.Store(&serverCert)
		certs.TLSClientCert.Store(&clientCert)
	}

	// Create the Gossip message server instance gossipServiceInstance
	err = service.InitGossipService(serializedIdentity, peerEndpoint.Address, peerServer.Server(), certs,
		messageCryptoService, secAdv, secureDialOpts, bootstrap...)
Copy the code

Deploys the system chain code and initializes the chain structure of existing channels

① : Deploy the system chain code

	// Deploy the system chain code
	initSysCCs()
Copy the code

② : Initializes the chain structure on the existing channel

// Initializes the chain structure on the existing channel
	peer.Initialize(func(cid string){})Copy the code

core/peer/peer.go/Initialize

func Initialize(init func(string)) {
	nWorkers := viper.GetInt("peer.validatorPoolSize") // Get the number of transaction validation threads
	if nWorkers <= 0 {
		nWorkers = runtime.NumCPU()
	}
	//// Sets the number of concurrent semaphore accesses
	validationWorkersSemaphore = semaphore.NewWeighted(int64(nWorkers))

	chainInitializer = init // Set the initialization function

	var cb *common.Block
	var ledger ledger.PeerLedger
	//// Initialize the ledger manager
	ledgermgmt.Initialize(ConfigTxProcessors)
	//// gets a list of account ids under the current account manager
	ledgerIds, err := ledgermgmt.GetLedgerIDs()
	iferr ! =nil {
		panic(fmt.Errorf("Error in initializing ledgermgmt: %s", err))
	}
	for _, cid := range ledgerIds {
		peerLogger.Infof("Loading chain %s", cid)
		// Create a local Peer account
		ifledger, err = ledgermgmt.OpenLedger(cid); err ! =nil {
			peerLogger.Warningf("Failed to load ledger %s(%s)", cid, err)
			peerLogger.Debugf("Error while loading ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
			continue
		}
		//// gets the latest configuration block from the specified channel ledger
		ifcb, err = getCurrConfigBlockFromLedger(ledger); err ! =nil {
			peerLogger.Warningf("Failed to find config block on ledger %s(%s)", cid, err)
			peerLogger.Debugf("Error while looking for config block on ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err)
			continue
		}
		// Create a chain if we get a valid ledger with config block
		//// Creates a chain structure for the specified channel on the Peer node
		iferr = createChain(cid, ledger, cb); err ! =nil {
			peerLogger.Warningf("Failed to load chain %s(%s)", cid, err)
			peerLogger.Debugf("Error reloading chain %s with message %s. We continue to the next chain rather than abort.", cid, err)
			continue
		}
		// Initialize the channel chain structure with custom functions, such as deployment system chain code
		InitChain(cid)
	}
}
Copy the code

Start the gRPC server and profile server

// Establish a channel for passing error messages
	serve := make(chan error)
	// Signal channel
	sigs := make(chan os.Signal, 1)
	// Set the process signal channel notification signal, including interrupt/terminate signal
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func(a) {// Sets a specific notification signal for this process to block waiting
		sig := <-sigs // Read signal value from sigS channel, block wait mode
		logger.Debugf("sig: %s", sig)
		serve <- nil} ()// Use goroutine to start gRPC server (port 7051, registered Admin server, Endorser server, etc.)
	go func(a) {
		var grpcErr error
		ifgrpcErr = peerServer.Start(); grpcErr ! =nil { // Listening port (7051) provides services
			grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
		} else {
			logger.Info("peer server exited")
		}
		serve <- grpcErr // If you exit because of an error, send an error to the serve channel} ()// Write the running process ID to the process file
	if err := writePid(config.GetPath("peer.fileSystemPath") +"/peer.pid", os.Getpid()); err ! =nil {
		return err
	}

	// Start the event hub server
	// Start gRPC server based on dedicated event listening port (port 7053, registered EventHub event server)
	ifehubGrpcServer ! =nil {
		go ehubGrpcServer.Start()
	}

	// Start profiling http endpoint if enabled
	// If the profile enable flag is turned on, the provisioning service is started
	if viper.GetBool("peer.profile.enabled") {
		go func(a) { // Start the GO Profile server. If there is an error, no error message will be sent
			// Get the profile listening address
			profileListenAddress := viper.GetString("peer.profile.listenAddress")
			logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
			if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr ! =nil {
				logger.Errorf("Error starting profiler: %s", profileErr)
			}
		}()
	}

Copy the code

At this point, the Peer node and its function server have been started.


reference

Github.com/blockchainG…