ES5.4 source code analysis node start process

The entrance

if [[ $DAEMONIZE = false ]]; then
  exec \
    "$JAVA" \
    "$XSHARE" \
    $ES_JAVA_OPTS \
    -Des.path.home="$ES_HOME" \
    -Des.path.conf="$ES_PATH_CONF" \
    -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" \
    -Des.distribution.type="$ES_DISTRIBUTION_TYPE" \
    -Des.bundled_jdk="$ES_BUNDLED_JDK" \
    -cp "$ES_CLASSPATH" \
    org.elasticsearch.bootstrap.Elasticsearch \
    "$@" <<<"$KEYSTORE_PASSWORD"
else
  exec \
    "$JAVA" \
    "$XSHARE" \
    $ES_JAVA_OPTS \
    -Des.path.home="$ES_HOME" \
    -Des.path.conf="$ES_PATH_CONF" \
    -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" \
    -Des.distribution.type="$ES_DISTRIBUTION_TYPE" \
    -Des.bundled_jdk="$ES_BUNDLED_JDK" \
    -cp "$ES_CLASSPATH" \
    org.elasticsearch.bootstrap.Elasticsearch \
    "$@" \
    <<<"$KEYSTORE_PASSWORD" &
  retval=$?
Copy the code

The flow chart

  1. Parses configuration files and command parameters
  2. Check the external environment and internal environment JVM version and operating system
  3. Initialize internal resources and create internal modules
  4. Start each submodule and keep thread

org.elasticsearch.bootstrap.Elasticsearch

main

public static void main(final String[] args) throws Exception {
        // we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
        // presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)
        System.setSecurityManager(new SecurityManager() {
            @Override
            public void checkPermission(Permission perm) {
                // grant all permissions so that we can later set the security manager to the one that we want}}); LogConfigurator.registerErrorListener();final Elasticsearch elasticsearch = new Elasticsearch();
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        if (status != ExitCodes.OK) {
            exit(status);
        }
    }
Copy the code

Main (args, elasticSearch, terminal.default) calls the main method of the Command class

Elasticsearch constructor

 Elasticsearch() {
        super("starts elasticsearch");
        versionOption = parser.acceptsAll(Arrays.asList("V"."version"),
            "Prints elasticsearch version information and exits");
        daemonizeOption = parser.acceptsAll(Arrays.asList("d"."daemonize"),
            "Starts Elasticsearch in the background")
            .availableUnless(versionOption);
        pidfileOption = parser.acceptsAll(Arrays.asList("p"."pidfile"),
            "Creates a pid file in the specified path on start")
            .availableUnless(versionOption)
            .withRequiredArg()
            .withValuesConvertedBy(new PathConverter());
        quietOption = parser.acceptsAll(Arrays.asList("q"."quiet"),
            "Turns off standard ouput/error streams logging in console")
            .availableUnless(versionOption)
            .availableUnless(daemonizeOption);
    }
Copy the code
  • Mainly do analytical parameters

org.elasticsearch.cli.Command#main

public final int main(String[] args, Terminal terminal) throws Exception {
    if (addShutdownHook()) {
        shutdownHookThread.set(new Thread(() -> {
            try {
                this.close();
            } catch (final IOException e) {
                try (
                    StringWriter sw = new StringWriter();
                    PrintWriter pw = new PrintWriter(sw)) {
                    e.printStackTrace(pw);
                    terminal.println(sw.toString());
                } catch (final IOException impossible) {
                    // StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
                    // say that an exception here is impossible
                    throw newAssertionError(impossible); }}})); Runtime.getRuntime().addShutdownHook(shutdownHookThread.get()); }if (shouldConfigureLoggingWithoutConfig()) {
        // initialize default for es.logger.level because we will not read the log4j2.properties
        final String loggerLevel = System.getProperty("es.logger.level", Level.INFO.name());
        final Settings settings = Settings.builder().put("logger.level", loggerLevel).build();
        LogConfigurator.configureWithoutConfig(settings);
    }

    try {
        mainWithoutErrorHandling(args, terminal);
    } catch (OptionException e) {
        printHelp(terminal);
        terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
        return ExitCodes.USAGE;
    } catch (UserException e) {
        if (e.exitCode == ExitCodes.USAGE) {
            printHelp(terminal);
        }
        terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
        return e.exitCode;
    }
    return ExitCodes.OK;
}
Copy the code
  • Adopting a template approach

    • Added thread closing hooks to close some resources

    • Configuring Log Information

    • Mainwithterrorhandling main processing methods

      • void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {	
        		// Parameter parsing
            final OptionSet options = parser.parse(args);
        
            if (options.has(helpOption)) {
                printHelp(terminal);
                return;
            }
        
            if (options.has(silentOption)) {
                terminal.setVerbosity(Terminal.Verbosity.SILENT);
            } else if (options.has(verboseOption)) {
                terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
            } else {
                terminal.setVerbosity(Terminal.Verbosity.NORMAL);
            }
        		// True execution
            execute(terminal, options);
        }
        Copy the code
        • Execute is an abstract method implemented by subclasses

          • Start with Exec of EnvironmentAwareCommand

            • @Override
              protected void execute(Terminal terminal, OptionSet options) throws Exception {
                  final Map<String, String> settings = new HashMap<>();
                  for (final KeyValuePair kvp : settingOption.values(options)) {
                      if (kvp.value.isEmpty()) {
                          throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
                      }
                      if (settings.containsKey(kvp.key)) {
                          final String message = String.format(
                                  Locale.ROOT,
                                  "setting [%s] already set, saw [%s] and [%s]",
                                  kvp.key,
                                  settings.get(kvp.key),
                                  kvp.value);
                          throw new UserException(ExitCodes.USAGE, message);
                      }
                      settings.put(kvp.key, kvp.value);
                  }
              
                  putSystemPropertyIfSettingIsMissing(settings, "default.path.conf"."es.default.path.conf");
                  putSystemPropertyIfSettingIsMissing(settings, "default.path.data"."es.default.path.data");
                  putSystemPropertyIfSettingIsMissing(settings, "default.path.logs"."es.default.path.logs");
                  putSystemPropertyIfSettingIsMissing(settings, "path.conf"."es.path.conf");
                  putSystemPropertyIfSettingIsMissing(settings, "path.data"."es.path.data");
                  putSystemPropertyIfSettingIsMissing(settings, "path.home"."es.path.home");
                  putSystemPropertyIfSettingIsMissing(settings, "path.logs"."es.path.logs");
              		// Implemented by subclasses
                  execute(terminal, options, createEnv(terminal, settings));
              }
              Copy the code
              • org.elasticsearch.bootstrap.Elasticsearch#execute

              • @Override
                protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
                    if (options.nonOptionArguments().isEmpty() == false) {
                        throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
                    }
                    if (options.has(versionOption)) {
                        if (options.has(daemonizeOption) || options.has(pidfileOption)) {
                            throw new UserException(ExitCodes.USAGE, "Elasticsearch version option is mutually exclusive with any other option");
                        }
                        terminal.println("Version: " + org.elasticsearch.Version.CURRENT
                                + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
                                + ", JVM: " + JvmInfo.jvmInfo().version());
                        return;
                    }
                
                    final boolean daemonize = options.has(daemonizeOption);
                    final Path pidFile = pidfileOption.value(options);
                    final boolean quiet = options.has(quietOption);
                
                    try {
                        init(daemonize, pidFile, quiet, env);
                    } catch (NodeValidationException e) {
                        throw newUserException(ExitCodes.CONFIG, e.getMessage()); }}Copy the code
                void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
                    throws NodeValidationException, UserException {
                    try{ Bootstrap.init(! daemonize, pidFile, quiet, initialEnv); }catch (BootstrapException | RuntimeException e) {
                        // format exceptions to the console in a special way
                        // to avoid 2MB stacktraces from guice, etc.
                        throw newStartupException(e); }}Copy the code
                • Bootstrap#init

                  static void init(
                          final boolean foreground,
                          final Path pidFile,
                          final boolean quiet,
                          final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
                      // 
                      BootstrapInfo.init();
                  
                      INSTANCE = new Bootstrap();
                  
                      final SecureSettings keystore = loadSecureSettings(initialEnv);
                      Environment environment = createEnvironment(foreground, pidFile, keystore, initialEnv.settings());
                      try {
                          LogConfigurator.configure(environment);
                      } catch (IOException e) {
                          throw new BootstrapException(e);
                      }
                      checkForCustomConfFile();
                      checkConfigExtension(environment.configExtension());
                  
                      if(environment.pidFile() ! =null) {
                          try {
                              PidFile.create(environment.pidFile(), true);
                          } catch (IOException e) {
                              throw newBootstrapException(e); }}final boolean closeStandardStreams = (foreground == false) || quiet;
                      try {
                          if (closeStandardStreams) {
                              final Logger rootLogger = ESLoggerFactory.getRootLogger();
                              final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                              if(maybeConsoleAppender ! =null) {
                                  Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                              }
                              closeSystOut();
                          }
                  
                          // fail if somebody replaced the lucene jars
                          checkLucene();
                  
                          // install the default uncaught exception handler; must be done before security is
                          // initialized as we do not want to grant the runtime permission
                          // setDefaultUncaughtExceptionHandler
                          Thread.setDefaultUncaughtExceptionHandler(
                              new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));
                  
                          INSTANCE.setup(true, environment);
                  
                          try {
                              // any secure settings must be read during node construction
                              IOUtils.close(keystore);
                          } catch (IOException e) {
                              throw new BootstrapException(e);
                          }
                  
                          INSTANCE.start();
                  
                          if(closeStandardStreams) { closeSysError(); }}catch (NodeValidationException | RuntimeException e) {
                          // disable console logging, so user does not see the exception twice (jvm will show it already)
                          final Logger rootLogger = ESLoggerFactory.getRootLogger();
                          final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                          if(foreground && maybeConsoleAppender ! =null) {
                              Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                          }
                          Logger logger = Loggers.getLogger(Bootstrap.class);
                          if(INSTANCE.node ! =null) {
                              logger = Loggers.getLogger(Bootstrap.class, Node.NODE_NAME_SETTING.get(INSTANCE.node.settings()));
                          }
                          // HACK, it sucks to do this, but we will run users out of disk space otherwise
                          if (e instanceof CreationException) {
                              // guice: log the shortened exc to the log file
                              ByteArrayOutputStream os = new ByteArrayOutputStream();
                              PrintStream ps = null;
                              try {
                                  ps = new PrintStream(os, false."UTF-8");
                              } catch (UnsupportedEncodingException uee) {
                                  assert false;
                                  e.addSuppressed(uee);
                              }
                              new StartupException(e).printStackTrace(ps);
                              ps.flush();
                              try {
                                  logger.error("Guice Exception: {}", os.toString("UTF-8"));
                              } catch (UnsupportedEncodingException uee) {
                                  assert false; e.addSuppressed(uee); }}else if (e instanceof NodeValidationException) {
                              logger.error("node validation exception\n{}", e.getMessage());
                          } else {
                              // full exception
                              logger.error("Exception", e);
                          }
                          // re-enable it if appropriate, so they can see any logging during the shutdown process
                          if(foreground && maybeConsoleAppender ! =null) {
                              Loggers.addAppender(rootLogger, maybeConsoleAppender);
                          }
                  
                          throwe; }}Copy the code
                  • Init is an empty method that makes it easier for subclasses to do some initialization

                  • INSTANCE = new Bootstrap(); Create an instance

                  • private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
                    Bootstrap() {
                        keepAliveThread = new Thread(new Runnable() {
                            @Override
                            public void run(a) {
                                try {
                                    keepAliveLatch.await();
                                } catch (InterruptedException e) {
                                    // bail out}}},"elasticsearch[keepAlive/" + Version.CURRENT + "]");
                        keepAliveThread.setDaemon(false);
                        // keep this thread alive (non daemon thread) until we shutdown
                        Runtime.getRuntime().addShutdownHook(new Thread() {
                            @Override
                            public void run(a) { keepAliveLatch.countDown(); }}); }Copy the code
                    • Keepthreads themselves do no specific work.
                    • The keepThread thread is the only user thread that keeps the process running
                  • final SecureSettings keystore = loadSecureSettings(initialEnv); Loading security Configuration

                  • private static SecureSettings loadSecureSettings(Environment initialEnv) throws BootstrapException {
                        final KeyStoreWrapper keystore;
                        try {
                            keystore = KeyStoreWrapper.load(initialEnv.configFile());
                        } catch (IOException e) {
                            throw new BootstrapException(e);
                        }
                        if (keystore == null) {
                            return null; // no keystore
                        }
                    
                        try {
                            keystore.decrypt(new char[0] / *TODO: read password from stdin */);
                        } catch (Exception e) {
                            throw new BootstrapException(e);
                        }
                        return keystore;
                    }
                    Copy the code
                • KeyStoreWrapper.load

                • public static KeyStoreWrapper load(Path configDir) throws IOException { Path keystoreFile = keystorePath(configDir); if (Files.exists(keystoreFile) == false) { return null; } // start lucene SimpleFSDirectory directory = new SimpleFSDirectory(configDir); try (IndexInput indexInput = directory.openInput(KEYSTORE_FILENAME, IOContext.READONCE)) { ChecksumIndexInput input = new BufferedChecksumIndexInput(indexInput); int formatVersion = CodecUtil.checkHeader(input, KEYSTORE_FILENAME, MIN_FORMAT_VERSION, FORMAT_VERSION); byte hasPasswordByte = input.readByte(); boolean hasPassword = hasPasswordByte == 1; if (hasPassword == false && hasPasswordByte ! = 0) { throw new IllegalStateException("hasPassword boolean is corrupt: " + String.format(Locale.ROOT, "%02x", hasPasswordByte)); } String type = input.readString(); String stringKeyAlgo = input.readString(); final String fileKeyAlgo; if (formatVersion >= 2) { fileKeyAlgo = input.readString(); } else { fileKeyAlgo = NEW_KEYSTORE_FILE_KEY_ALGO; } final Map<String, KeyType> settingTypes; if (formatVersion >= 2) { settingTypes = input.readMapOfStrings().entrySet().stream().collect(Collectors.toMap( Map.Entry::getKey, e -> KeyType.valueOf(e.getValue()))); } else { settingTypes = new HashMap<>(); } byte[] keystoreBytes = new byte[input.readInt()]; input.readBytes(keystoreBytes, 0, keystoreBytes.length); CodecUtil.checkFooter(input); return new KeyStoreWrapper(formatVersion, hasPassword, type, stringKeyAlgo, fileKeyAlgo, settingTypes, keystoreBytes); }}Copy the code
                • Path keystoreFile = keystorePath(configDir)

            • The core code is the Node class

              • A constructor

                • protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
                      final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
                      boolean success = false;
                      {
                          // use temp logger just to say we are starting. we can't use it later on because the node name might not be set
                          Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
                          logger.info("initializing ...");
                  
                      }
                      try {
                          Settings tmpSettings = Settings.builder().put(environment.settings())
                              .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
                  
                          tmpSettings = TribeService.processSettings(tmpSettings);
                  
                          // create the node environment as soon as possible, to recover the node id and enable logging
                          try {
                              nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
                              resourcesToClose.add(nodeEnvironment);
                          } catch (IOException ex) {
                              throw new IllegalStateException("Failed to create node environment", ex);
                          }
                          final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
                          Logger logger = Loggers.getLogger(Node.class, tmpSettings);
                          final String nodeId = nodeEnvironment.nodeId();
                          tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
                          if (DiscoveryNode.nodeRequiresLocalStorage(tmpSettings)) {
                              checkForIndexDataInDefaultPathData(tmpSettings, nodeEnvironment, logger);
                          }
                          // this must be captured after the node name is possibly added to the settings
                          final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
                          if (hadPredefinedNodeName == false) {
                              logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey());
                          } else {
                              logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
                          }
                  
                          final JvmInfo jvmInfo = JvmInfo.jvmInfo();
                          logger.info(
                              "version[{}], pid[{}], build[{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
                              displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
                              jvmInfo.pid(),
                              Build.CURRENT.shortHash(),
                              Build.CURRENT.date(),
                              Constants.OS_NAME,
                              Constants.OS_VERSION,
                              Constants.OS_ARCH,
                              Constants.JVM_VENDOR,
                              Constants.JVM_NAME,
                              Constants.JAVA_VERSION,
                              Constants.JVM_VERSION);
                          logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
                          warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
                  
                          if (logger.isDebugEnabled()) {
                              logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
                                  environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());
                          }
                          // TODO: Remove this in Elasticsearch 6.0.0
                          if (JsonXContent.unquotedFieldNamesSet) {
                              DeprecationLogger dLogger = new DeprecationLogger(logger);
                              dLogger.deprecated("[{}] has been set, but will be removed in Elasticsearch 6.0.0",
                                  JsonXContent.JSON_ALLOW_UNQUOTED_FIELD_NAMES);
                          }
                  
                          this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
                          this.settings = pluginsService.updatedSettings();
                          localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
                  
                          // create the environment based on the finalized (processed) view of the settings
                          // this is just to makes sure that people get the same settings, no matter where they ask them from
                          this.environment = new Environment(this.settings);
                          Environment.assertEquivalent(environment, this.environment);
                  
                  
                          finalList<ExecutorBuilder<? >> executorBuilders = pluginsService.getExecutorBuilders(settings);final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
                          resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
                          // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
                          DeprecationLogger.setThreadContext(threadPool.getThreadContext());
                          resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
                  
                          finalList<Setting<? >> additionalSettings =new ArrayList<>(pluginsService.getPluginSettings());
                          final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
                          for (finalExecutorBuilder<? > builder : threadPool.builders()) { additionalSettings.addAll(builder.getRegisteredSettings()); } client =new NodeClient(settings, threadPool);
                          final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
                          final ScriptModule scriptModule = ScriptModule.create(settings, this.environment, resourceWatcherService,
                              pluginsService.filterPlugins(ScriptPlugin.class));
                          AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
                          additionalSettings.addAll(scriptModule.getSettings());
                          // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
                          // so we might be late here already
                          final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
                          scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
                          resourcesToClose.add(resourceWatcherService);
                          final NetworkService networkService = new NetworkService(settings,
                              getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
                          final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
                              localNodeFactory::getNode);
                          clusterService.addStateApplier(scriptModule.getScriptService());
                          resourcesToClose.add(clusterService);
                          final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
                              scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
                          final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
                  
                          ModulesBuilder modules = new ModulesBuilder();
                          // plugin modules must be added here, before others or we can get crazy injection errors...
                          for (Module pluginModule : pluginsService.createGuiceModules()) {
                              modules.add(pluginModule);
                          }
                          final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
                          modules.add(new NodeModule(this, monitorService));
                          ClusterModule clusterModule = new ClusterModule(settings, clusterService,
                              pluginsService.filterPlugins(ClusterPlugin.class));
                          modules.add(clusterModule);
                          IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
                          modules.add(indicesModule);
                  
                          SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
                          CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
                              settingsModule.getClusterSettings());
                          resourcesToClose.add(circuitBreakerService);
                          ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                                  settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                                  threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
                          modules.add(actionModule);
                          modules.add(new GatewayModule());
                  
                  
                          BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
                          resourcesToClose.add(bigArrays);
                          modules.add(settingsModule);
                          List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
                              NetworkModule.getNamedWriteables().stream(),
                              indicesModule.getNamedWriteables().stream(),
                              searchModule.getNamedWriteables().stream(),
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .flatMap(p -> p.getNamedWriteables().stream()),
                              ClusterModule.getNamedWriteables().stream())
                              .flatMap(Function.identity()).collect(Collectors.toList());
                          final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
                          NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
                              NetworkModule.getNamedXContents().stream(),
                              searchModule.getNamedXContents().stream(),
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .flatMap(p -> p.getNamedXContent().stream()),
                              ClusterModule.getNamedXWriteables().stream())
                              .flatMap(Function.identity()).collect(toList()));
                          final TribeService tribeService = new TribeService(settings, clusterService, nodeId, namedWriteableRegistry,
                              s -> newTribeClientNode(s, classpathPlugins));
                          resourcesToClose.add(tribeService);
                          modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
                          final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
                          final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
                              settingsModule.getClusterSettings(), analysisModule.getAnalysisRegistry(),
                              clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
                              threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
                              clusterService, client, metaStateService);
                  
                          Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
                              .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
                                                               scriptModule.getScriptService(), xContentRegistry).stream())
                              .collect(Collectors.toList());
                          final RestController restController = actionModule.getRestController();
                          final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                                  threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);
                          Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .map(Plugin::getCustomMetaDataUpgrader)
                                  .collect(Collectors.toList());
                          Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =
                              pluginsService.filterPlugins(Plugin.class).stream()
                                  .map(Plugin::getIndexTemplateMetaDataUpgrader)
                                  .collect(Collectors.toList());
                          Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
                                  .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
                          final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
                          new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
                          final Transport transport = networkModule.getTransportSupplier().get();
                          final TransportService transportService = newTransportService(settings, transport, threadPool,
                              networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
                          final SearchTransportService searchTransportService =  new SearchTransportService(settings,
                              transportService);
                          final Consumer<Binder> httpBind;
                          final HttpServerTransport httpServerTransport;
                          if (networkModule.isHttpEnabled()) {
                              httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
                              httpBind = b -> {
                                  b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
                              };
                          } else {
                              httpBind = b -> {
                                  b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
                              };
                              httpServerTransport = null;
                          }
                          final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
                              namedWriteableRegistry, networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class));
                          NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                              transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                              httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
                  
                          modules.add(b -> {
                                  b.bind(NodeService.class).toInstance(nodeService);
                                  b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
                                  b.bind(PluginsService.class).toInstance(pluginsService);
                                  b.bind(Client.class).toInstance(client);
                                  b.bind(NodeClient.class).toInstance(client);
                                  b.bind(Environment.class).toInstance(this.environment);
                                  b.bind(ThreadPool.class).toInstance(threadPool);
                                  b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
                                  b.bind(TribeService.class).toInstance(tribeService);
                                  b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
                                  b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                                  b.bind(BigArrays.class).toInstance(bigArrays);
                                  b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
                                  b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
                                  b.bind(IngestService.class).toInstance(ingestService);
                                  b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                                  b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
                                  b.bind(MetaStateService.class).toInstance(metaStateService);
                                  b.bind(IndicesService.class).toInstance(indicesService);
                                  b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
                                      threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
                                  b.bind(SearchTransportService.class).toInstance(searchTransportService);
                                  b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
                                          scriptModule.getScriptService()));
                                  b.bind(Transport.class).toInstance(transport);
                                  b.bind(TransportService.class).toInstance(transportService);
                                  b.bind(NetworkService.class).toInstance(networkService);
                                  b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
                                  b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings, xContentRegistry,
                                      indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders));
                                  b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
                                  b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
                                  {
                                      RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
                                      processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
                                      b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
                                              indicesService, recoverySettings, clusterService));
                                      b.bind(PeerRecoveryTargetService.class).toInstance(newPeerRecoveryTargetService(settings, threadPool, transportService, recoverySettings, clusterService)); } httpBind.accept(b); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); }); injector = modules.createInjector(); List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream() .filter(p -> pinstanceof LifecycleComponent)
                              .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
                          pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
                              .map(injector::getInstance).collect(Collectors.toList()));
                          resourcesToClose.addAll(pluginLifecycleComponents);
                          this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
                          client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
                                  () -> clusterService.localNode().getId());
                  
                          if (NetworkModule.HTTP_ENABLED.get(settings)) {
                              logger.debug("initializing HTTP handlers ...");
                              actionModule.initRestHandlers(() -> clusterService.state().nodes());
                          }
                          logger.info("initialized");
                  
                          success = true;
                      } catch (IOException ex) {
                          throw new ElasticsearchException("failed to bind service", ex);
                      } finally {
                          if(! success) { IOUtils.closeWhileHandlingException(resourcesToClose); }}}Copy the code
                  • Note:
                    • Building blocks
              • The start method

              public Node start(a) throws NodeValidationException {
                  if(! lifecycle.moveToStarted()) {return this;
                  }
              
                  Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
                  logger.info("starting ...");
                  // hack around dependency injection problem (for now...)
                  injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
                  pluginLifecycleComponents.forEach(LifecycleComponent::start);
              
                  injector.getInstance(MappingUpdatedAction.class).setClient(client);
                  injector.getInstance(IndicesService.class).start();
                  injector.getInstance(IndicesClusterStateService.class).start();
                  injector.getInstance(IndicesTTLService.class).start();
                  injector.getInstance(SnapshotsService.class).start();
                  injector.getInstance(SnapshotShardsService.class).start();
                  injector.getInstance(RoutingService.class).start();
                  injector.getInstance(SearchService.class).start();
                  injector.getInstance(MonitorService.class).start();
              
                  final ClusterService clusterService = injector.getInstance(ClusterService.class);
              
                  final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
                  nodeConnectionsService.start();
                  clusterService.setNodeConnectionsService(nodeConnectionsService);
              
                  // TODO hack around circular dependencies problems
                  injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));
              
                  injector.getInstance(ResourceWatcherService.class).start();
                  injector.getInstance(GatewayService.class).start();
                  Discovery discovery = injector.getInstance(Discovery.class);
                  clusterService.setDiscoverySettings(discovery.getDiscoverySettings());
                  clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock());
                  clusterService.setClusterStatePublisher(discovery::publish);
              
                  // start before the cluster service since it adds/removes initial Cluster state blocks
                  final TribeService tribeService = injector.getInstance(TribeService.class);
                  tribeService.start();
              
                  // Start the transport service now so the publish address will be added to the local disco node in ClusterService
                  TransportService transportService = injector.getInstance(TransportService.class);
                  transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
                  transportService.start();
                  validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream()
                      .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
              
                  clusterService.addStateApplier(transportService.getTaskManager());
                  clusterService.start();
                  assertlocalNodeFactory.getNode() ! =null;
                  assert transportService.getLocalNode().equals(localNodeFactory.getNode())
                      : "transportService has a different local node than the factory provided";
                  assert clusterService.localNode().equals(localNodeFactory.getNode())
                      : "clusterService has a different local node than the factory provided";
                  // start after cluster service so the local disco is known
                  discovery.start();
                  transportService.acceptIncomingRequests();
                  discovery.startInitialJoin();
                  // tribe nodes don't have a master so we shouldn't register an observer s
                  final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
                  if (initialStateTimeout.millis() > 0) {
                      final ThreadPool thread = injector.getInstance(ThreadPool.class);
                      ClusterState clusterState = clusterService.state();
                      ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
                      if (clusterState.nodes().getMasterNodeId() == null) {
                          logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
                          final CountDownLatch latch = new CountDownLatch(1);
                          observer.waitForNextChange(new ClusterStateObserver.Listener() {
                              @Override
                              public void onNewClusterState(ClusterState state) { latch.countDown(); }
              
                              @Override
                              public void onClusterServiceClose(a) {
                                  latch.countDown();
                              }
              
                              @Override
                              public void onTimeout(TimeValue timeout) {
                                  logger.warn("timed out while waiting for initial discovery state - timeout: {}", initialStateTimeout); latch.countDown(); } }, state -> state.nodes().getMasterNodeId() ! =null, initialStateTimeout);
              
                          try {
                              latch.await();
                          } catch (InterruptedException e) {
                              throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); }}}if (NetworkModule.HTTP_ENABLED.get(settings)) {
                      injector.getInstance(HttpServerTransport.class).start();
                  }
              
                  if (WRITE_PORTS_FILE_SETTING.get(settings)) {
                      if(NetworkModule.HTTP_ENABLED.get(settings)) { HttpServerTransport http = injector.getInstance(HttpServerTransport.class);  writePortsFile("http", http.boundAddress());
                      }
                      TransportService transport = injector.getInstance(TransportService.class);
                      writePortsFile("transport", transport.boundAddress());
                  }
              
                  // start nodes now, after the http server, because it may take some time
                  tribeService.startNodes();
                  logger.info("started");
              
                  return this;
              }
              Copy the code
              • Note:

                • Start each service

                  • TribeService#start

                  • TransportService#start

conclusion

  • The Lightweight IOC container Inject of Google is adopted
  • Core classes org. Elasticsearch. Node. Node# node