This is the tenth article in the elasticSearch source code analysis series. This article continues the previous article on elasticSearch’s Plugin and analyzes the thread pools in each Plugin.

In the previous article, we covered the basic concepts of ElasticSearch’s plugins and the PluginService initialization code involved in Node instantiation. This article will continue to explore the PluginsService’s role in Node instantiation. That is, the thread pool framework is built from the parameters in the PluginsService.

When the thread pool is initialized

When the Node completes the construction of the PluginsService, it immediately retrieves the list of Executor constructors for the thread pool via the getExecutorBuilders method as follows:

List<ExecutorBuilder<? >> executorBuilders = pluginsService.getExecutorBuilders(settings)Copy the code

At this point, the PluginsService object has all plugins that need to be loaded, including all components in the modules path and the plugins path, which are collectively called plugins here. As shown in the figure below, there are a total of 13 loaded Plugins, which are the 12 that must be loaded by default in the Modules path and the 1 custom installed in the Plugins path (ICU segmented). See the figure below

Build the thread pool framework

Initialize the ExecutorBuilder collection

During Node instantiation, the following code is used:

List<ExecutorBuilder<? >> executorBuilders = pluginsService.getExecutorBuilders(settings);Copy the code

The custom thread pool Executor builder was found. Once you have the custom ThreadPool builder set, start building the ThreadPool (ThreadPool).

ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
Copy the code

First, the number of cpus in the processor is obtained by code,

Runtime.getRuntime().availableProcessors()
Copy the code

That value can, of course, be overridden by the variable processors that are set in the Setting. This variable is marked as availableProcessors in the code. And then I’m gonna create a variable

  • HalfProcMaxAt5, this variable means half of availableProcessors, but no more than 5.
  • HalfProcMaxAt10, this variable means half of availableProcessors, but no more than 10.

These two variables are used repeatedly later in the creation of various thread pool constructors.

After determining the number of cpus available, we can determine the minimum value of the thread pool (genericThreadPoolMax). In ElasticSearch, genericThreadPoolMax is determined to be four times the number of cpus available, and the fixed range is from a minimum of 128 to a maximum of 512.

If you use a normal server, the thread pool limit will eventually be set to 128, which is relatively high.

ExecutorBuilder (Map

); ExecutorBuilder (Map

);
,>
,>

  • Executor for ordinary operations: Build a scalable Executor builder with value as a ScalingExecutorBuilder object. The receiving parameters and corresponding operations are as follows:

    • Name: The name of the thread pool executor, which is generic.
    • Core: specifies the minimum value of a thread in the thread pool4. willthread_pool.generic.coreLet’s set theta to this value.
    • Max: The maximum number of threads in the thread pool, corresponding to genericThreadPoolMax mentioned above, which runs as 128 locally
    • KeepAlive: indicates the duration for which a thread remains active after the number of threads exceeds four. This value is fixed at 30 seconds. This parameter is set as a variablethread_pool.generic.keep_alive
  • Executor for indexing operations: Build a fixed Executor builder. Key is index and value is a FixedExecutorBuilder object. The receiving parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.index.sizeIs the number of cpus in this parameter
    • Name: Name of the thread pool executor, that is, IDNEx.
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.index.sizeIs the value of size, the result of the local run is4.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.index.queue_sizeThe value of is 200. Note that the value is fixed as200.
  • Executor for batch operations: Build a fixed Executor builder. Key is bulk, and value is a FixedExecutorBuilder object. The receiving parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.bulk.sizeIs the number of cpus in this parameter
    • Name: Name of the thread pool executor, that is, BULK.
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.bulk.sizeIs the value of size, the result of the local run is4.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.bulk.queue_sizeThe value of is 200. Note that the value is fixed as200.
  • Executor for GET operations: Build a fixed Executor builder. Key is get and value is a FixedExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.get.sizeIs the number of cpus in this parameter
    • Name: Name of the thread pool executor, i.e. Get.
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.get.sizeIs the value of size, the result of the local run is4.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.get.queue_sizeThe value of is 1000. Note that the value is fixed as1000.
  • Executor for query operations: Build an Executor builder that automatically expands the length according to Little’s law. The logic for ElasticSearch is different from other builders and is complex, which explains why ElasticSearch is specially optimized for query operations. The key for the search, the value for AutoQueueAdjustingExecutorBuilder object, receiving parameters and corresponding operation is as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.search.sizeIs the number of cpus in this parameter
    • Name: The name of the thread pool executor, which is search.
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.search.sizeIs the value of size, the result of the local run is7.
    • InitialQueueSize: specifies the size of the initial queue. The value is fixed to1000, create configuration variablesthread_pool.search.queue_sizeThe value is 200
    • MinQueueSize: specifies the minimum queue length. The value is fixed to1000Setting configuration variablesthread_pool.search.min_queue_sizeThe value of1000
    • MaxQueueSize: indicates the maximum queue length. The value is fixed to1000To set configuration variablesthread_pool.search.max_queue_sizeThe value of1000
    • FrameSize: indicates the queue step length. The value is set to2000To construct the configuration variablethread_pool.search.auto_queue_frame_sizeThe value of is 200. Note that the value is fixed as200.
    • thread_pool.search.target_response_timeforsearchThe corresponding operation is set to 1S,
  • Manage Executor operations: Build a scalable Executor builder. Key is management, and value is a ScalingExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.management.sizeIs the number of cpus in this parameter
    • Name: Name of the thread pool executor, i.e., management,
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.management.sizeIs the value of size, the result of the local run is1.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.management.queue_sizeThe value of is 200. Note that the value is fixed as200.
    • KeepAlive: indicates the duration for which a thread remains active after the number of threads exceeds one. This value is fixed at 5 minutes. This parameter is set as a variablethread_pool.management.keep_alive.
  • Executor for listening operations: Build a fixed Executor builder. Key is the listener and value is a FixedExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.listener.sizeIs the number of cpus in this parameter
    • Name: The name of the thread pool executor, the listener,
    • Size: The fixed size of the thread, as mentioned abovehalfProcMaxAt10, and the parameternameConstruct configuration variables togetherthread_pool.listener.sizeIs the value of size, the result of the local run is2.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.listener.queue_sizeThe value of **-1** means there is no blocking queue.
  • Executor for Flush: Build a scalable Executor builder. Key is Flush, and value is a ScalingExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.flush.sizeIs the number of cpus in this parameter
    • Name: name of the thread pool executor, i.e. Flush,
    • Size: The fixed size of the thread, as mentioned abovehalfProcMaxAt5, and the parameternameConstruct configuration variables togetherthread_pool.flush.sizeIs the value of size, the result of the local run is4.
    • KeepAlive: indicates the duration for which a thread remains active after the number of threads exceeds one. This value is fixed at 5 minutes. This parameter is set as a variablethread_pool.management.keep_alive.
  • Executor for Refresh operations: Build a scalable Executor builder. Key is refresh, and value is the ScalingExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.refresh.sizeIs the number of cpus in this parameter
    • Name: name of the thread pool executor, i.e. Refresh,
    • Size: The fixed size of the thread, as mentioned abovehalfProcMaxAt10, and the parameternameConstruct configuration variables togetherthread_pool.refresh.sizeIs the value of size, the result of the local run is4.
    • KeepAlive: indicates the duration for which a thread remains active after the number of threads exceeds one. This value is fixed at 5 minutes. This parameter is set as a variablethread_pool.management.keep_alive.
  • Executor with warmer operations: Build a scalable Executor builder. The key is a ScalingExecutorBuilder object, and the value is a ScalingExecutorBuilder object. The parameters and operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.warmer.sizeIs the number of cpus in this parameter
    • 1, name: name of the thread pool executor, warmer,
    • Size: The fixed size of the thread, as mentioned abovehalfProcMaxAt5, and the parameternameConstruct configuration variables togetherthread_pool.warmer.sizeIs the value of size, the result of the local run is4.
    • KeepAlive: indicates the duration for which a thread remains active after the number of threads exceeds one. This value is fixed at 5 minutes. This parameter is set as a variablethread_pool.management.keep_alive.
  • Executor for snapshot operations: Build a scalable Executor builder. Key is snapshot, and value is a ScalingExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.snapshot.sizeIs the number of cpus in this parameter
    • Name: name of the thread pool executor, i.e. Snapshot,
    • Size: The fixed size of the thread, as mentioned abovehalfProcMaxAt5, and the parameternameConstruct configuration variables togetherthread_pool.snapshot.sizeIs the value of size, the result of the local run is4.
    • KeepAlive: indicates the duration for which a thread remains active after the number of threads exceeds one. This value is fixed at 5 minutes. This parameter is set as a variablethread_pool.management.keep_alive.
  • Executor for fragmentation operations: Build a scalable Executor builder. The key is fetch_shard_started, and the value is the ScalingExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.fetch_shard_started.sizeIs the number of cpus in this parameter
    • Name: the name of the thread pool executor, fetch_shard_started,
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.fetch_shard_started.sizeIs the value of size, the result of the local run is4.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.fetch_shard_started.queue_sizeThe value of is 200. Note that the value is fixed as200.
  • Executor forces merge operations: Build a scalable Executor builder. Key is force_merge, and value is a ScalingExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.force_merge.sizeIs the number of cpus in this parameter
    • Name: the name of the thread pool executor, i.e. Force_merge,
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.force_merge.sizeIs the value of size, the result of the local run is4.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.force_merge.queue_sizeThe value of is 200. Note that the value is fixed as200.
  • Get an Executor for a fragmented operation: Build a scalable Executor builder. The key is fetch_shard_store, and the value is the ScalingExecutorBuilder object. The received parameters and corresponding operations are as follows:

    • Settings: Node configuration Settings. Setting configuration variablesthread_pool.fetch_shard_store.sizeIs the number of cpus in this parameter
    • Name: name of the thread pool executor, fetch_shard_store,
    • Size: fixed size of the thread, and parametersnameConstruct configuration variables togetherthread_pool.fetch_shard_store.sizeIs the value of size, the result of the local run is4.
    • QueueSize: Size of the blocking queue, construct configuration variablethread_pool.fetch_shard_store.queue_sizeThe value of is 200. Note that the value is fixed as200.

Thus completes the org. Elasticsearch. Threadpool. Threadpool object creation.

What the ThreadPool object does

After obtaining the object of ThreadPool, NodeClient is built through ThreadPool.

client = new NodeClient(settings, threadPool);
Copy the code

And the ResourceWatcherService object,

final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
Copy the code

There are many other components that use thread pools, such as:

  • IngestService
  • ClusterInfoService
  • MonitorService
  • ActionModule
  • IndicesService
  • NetworkModule
  • TransportService
  • DiscoveryModule
  • NodeService

As you can see, thread pools are the core components of ElasticSearch. I will explain how these components function and how they work in future articles. For a storage search system like ElasticSearch, IO operations must be very frequent. Its role in these service components is becoming increasingly important.

Little’s law

Little’s law in query operations is a law that describes the relationship between three variables in a stable system.

Where L represents the average number of requests, λ represents the frequency of requests, and W represents the average time to respond to requests. For example, if there are 10 requests per second and each request takes 1 second to process, then 10 requests are being processed at any given time. To get back to our point, you need to use 10 threads for processing. If the processing time for a single request doubles, then the number of threads being processed also doubles, to 20.

After understanding the impact of processing time on request processing efficiency, we can see that in general the theoretical upper limit may not be the best value for thread pool size. The thread pool upper limit also needs to refer to the task processing time.

Assuming that the JVM can process 1000 tasks in parallel, if each request takes less than 30 seconds to process, then the worst-case maximum is 33.3 requests per second. However, if each request takes only 500 milliseconds, then the application can process 2000 requests per second.