This paper is participating in thePerformance optimization combat record”Topic essay activity

The business scenario

Consuming Kafka subscriptions pushes data to multiple extranet user HTTP Server addresses after matching split calculation. The ratio is 1: N.

Performance bottleneck

In the process of pressure measurement, it is found that in the Kafka sequential consumption scenario, the whole backlog is easily caused by a client timeout. Forwarding tasks have high requirements on real-time performance and need to be optimized and transformed.

Performance optimization StepByStep

1. Buy Kafka in bulk

In the case of pushing to an external client, the bottleneck is mainly in the IO time waiting for the customer response. Kafka is also the default batch consumption in Spring, but the batch framework itself is converted into a single thread sequence for batch consumption. So you need to batch messages first to improve parallelism. The reference code is as follows:

@Data
@Configuration
public class KafkaBatchConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private String batchGroupId;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.properties.session.timeout.ms}")
    private Integer sessionTimeOut;

    @Value("${spring.kafka.consumer.properties.partition:1}")
    private Integer partition;
    /** * Bulk consumer configuration information */
    @Bean
    public Map<String, Object> consumerBatchConfigs(a) {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, sessionTimeOut);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOut);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, batchGroupId);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /** * Consumer batch project */
    @Bean
    publicKafkaListenerContainerFactory<? > batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerBatchConfigs()));
        // Set consumption threads based on the number of partitions to improve performance. Extra partitions are meaningless
        factory.setConcurrency(partition);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        Consumerconfig. MAX_POLL_RECORDS_CONFIG is set in the Kafka configuration parameter for each batch
        factory.setBatchListener(true);
        returnfactory; }}Copy the code

2. Thread pool forwarding processing

Apache’s HttpClient is a singleton, but HttpClient has a thread pool concept. The batch request can be synchronized and forwarded. Need to understand the singleton pattern and the relationship between the thread pool, although HttpClient instance is a single class, but can be used when processing requests PoolingHttpClientConnectionManager thread pool to deal with different connection, respectively. For example, at the same time perform Http requests, 10 will obtain the connection pool to PoolingHttpClientConnectionManager resources at this time. After the HTTP request is successfully obtained, the HTTP request is executed.

At the same time, in order to ensure the reliability of Kafka message consumption, Kafka needs to manually submit the Offset after HTTP forwarding is successful. CountDownLatch used in multi-threaded programming is introduced to cooperate. Ensure that messages are delivered to Kafka smoothly. Therefore, the mode is changed to: The Consumer main thread calculates N clients for N messages to be forwarded, and the main thread sets up a CountDownLatch to wait for all forwarding tasks to complete. This prevents client overloading caused by excessive consumption.

@Slf4j
public class HttpClientSyncServiceImpl implements HttpClientService {

    private CloseableHttpClient closeableHttpClient;

    private HttpClientConfig httpClientConfig;

    private String pemBody = null;

    public HttpClientSyncServiceImpl(a) {}public HttpClientSyncServiceImpl(HttpClientConfig httpClientConfig) {
        this.httpClientConfig = httpClientConfig;
        try {
            log.info("init http client start, default config is {}", httpClientConfig);
            SSLContext sslcontext = HttpClientUtils.buildSSLContext();

            SSLConnectionSocketFactory trustAll = HttpClientUtils.buildSSLSocketFactory(sslcontext);
            // Configure support for both HTTP and HTTPS
            / / a httpClient object to HTTPS can choose only one SSLConnectionSocketFactory
            Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().
                    register("http", PlainConnectionSocketFactory.getSocketFactory()).
                    register("https", trustAll).build();
            // Initialize the connection manager
            PoolingHttpClientConnectionManager poolConnManager = buildPoolConnManager(socketFactoryRegistry);
            RequestConfig config = buildHttpClient(poolConnManager);
            closeableHttpClient = HttpClients.custom()
                    // Set connection pool management
                    .setConnectionManager(poolConnManager)
                    .setDefaultRequestConfig(config).build();
            log.info("init default http client finish");
        } catch (Exception e) {
            log.error("", e); }}public HttpClientSyncServiceImpl(HttpClientConfig httpClientConfig, String pem) {
        this.pemBody = pem;
        this.httpClientConfig = httpClientConfig;
        try {
            log.info("build new httpclient {}", httpClientConfig);
            closeableHttpClient = buildHttpClientByKeyStore(pem);
            log.info("init new http client finish");
        } catch (Exception e) {
            log.error("", e); }}private PoolingHttpClientConnectionManager buildPoolConnManager(Registry<ConnectionSocketFactory> socketFactoryRegistry) {
        PoolingHttpClientConnectionManager poolConnManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
        poolConnManager.setMaxTotal(httpClientConfig.getPollMaxTotal());// Maximum number of simultaneous connections
        // Set the maximum route
        poolConnManager.setDefaultMaxPerRoute(httpClientConfig.getPollMaxPeerRouter());
        return poolConnManager;
    }

    private CloseableHttpClient buildHttpClientByKeyStore(String pem) {
        try {
            SSLContext sslContext = HttpClientUtils.buildSSLContext(pem);
            SSLConnectionSocketFactory sslConnectionSocketFactory = HttpClientUtils.buildSSLSocketFactory(sslContext);
            // Initialize the connection manager
            Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create().
                    register("http", PlainConnectionSocketFactory.getSocketFactory()).
                    register("https", sslConnectionSocketFactory).build();
            PoolingHttpClientConnectionManager poolConnManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
            // Maximum number of simultaneous connections, custom thread pool set to number of connections per path
            poolConnManager.setMaxTotal(httpClientConfig.getPollMaxPeerRouter());
            poolConnManager.setDefaultMaxPerRoute(httpClientConfig.getPollMaxPeerRouter());
            RequestConfig config = HttpClientUtils.buildRequestConfig(httpClientConfig);
            return HttpClients.custom()
                    // Set connection pool management
                    .setConnectionManager(poolConnManager)
                    .setDefaultRequestConfig(config).build();
        } catch (Exception e) {
            log.error("build httpclient failed, use default", e);
            return null; }}private RequestConfig buildHttpClient(PoolingHttpClientConnectionManager poolConnManager) {
        returnRequestConfig.custom().setConnectTimeout(httpClientConfig.getConnectTimeout()) .setConnectionRequestTimeout(httpClientConfig.getConnectionRequestTimeout()) .setSocketTimeout(httpClientConfig.getResponseTimeout()) .build(); }}Copy the code

3. Calculate the optimization parallelism

In the performance pressure test, it is found that the speed of single message processing matching query is 2-4ms. For 100 pieces of data, it takes 0.4s in total to complete the serial calculation. This part of the time consumption can be optimized by parallelStream parallel computation in Java8. Note parallelStream is threadsafe in that the data returned is unordered.

Multiple problems continue to arise at this point:

1. JVM Full GC appears

[Full GC (Allocation Failure) 507M->382M(512M), 0.8299883 SECs] [Eden: 0.0b (25.0m)-> 0.0b (69.0m) Survivors: 0.0b -> 0.0b Heap: 507.3m (512.0m)-> 382.7m (512.0m)], [Metaspace: 109047K->109047K(1153024K)]

Debug environment set GC 512MB too small, change to 2G can be.

Full GC trigger condition

  • Concurrent mode failure: Concurrent mode failure. The CMS collector has the same concept. During G1 concurrent marking, if the old age is filled before the marking ends, G1 abandons the marking.
  • Promotion failure: At the end of the concurrent cycle, the mixed garbage collection cycle, accompanied by the young generation garbage collection, is used to clean up the old era space. If the speed of cleaning at this time is less than the speed of consumption, so that the old era is insufficient, then promotion failure will occur.
  • Evacuation failure: When young generation garbage is collected, if there is not enough space for all surviving objects in Survivor and Old sections. This must be a very fatal situation, as there is almost no room left, and it makes sense to trigger a Full GC at this point.
  • Large object allocation failed. We should avoid creating large objects as much as possible, especially objects larger than a block size.

2. The forwarding queue is stuck, and some forwarding needs to wait until the previous forwarding is complete

In the pressure test, it was found that the forwarding queue would be stuck, and the task would continue to be processed after a pause. At this time, the push task did not delay the return. There is no thread occupation. At this point, the CPU and memory as well as GC conditions are normal.

Cause: The tasks executed by a thread pool are IO intensive, and most cpus are idle. System resources are not fully utilized. If a large number of requests come in all at once, if the thread pool is larger than coreSize, the extra requests will be put into the wait queue. Wait for threads in the corePool to complete execution before executing tasks in the wait queue.

The code in the thread pool is executed in corePool->workQueue->maxPool

Therefore, the thread pool work order is changed to create a new worker thread instead of jumping into the queue when the core thread is full. This ensures that queued messages do not accumulate in memory.

The modification method is that the thread records the number of running tasks during execution. When queuing, the thread first determines whether the number of working tasks is larger than the core thread. If the number of working tasks is smaller than the core thread, the core thread creates and executes tasks.

  • When thread A submits A task, the first thread counts +1 to the Executor thread pool manager and the pool runs asynchronously after submission. I’m just going to count minus one. The thread pool is still running tasks. At this point, thread A, which submits the task, is waiting for the thread to finish asynchronously.
  • When the thread completes asynchronously, it notifies thread A, which then completes.

If PollExecutor’s afterExecute method is used to count -1, the operator is the thread in the thread pool. The thread has notified A to accept the new task after the asynchronous completion. If a new task is submitted at this time, the count cannot be updated. At this time, the thread pool determines that the core thread has no idle thread and creates a new thread.

Discovery: Thread pool creation exceeds limit when multiple threads concurrently invoke execute method

Observe the thread name, find that multiple threads concurrently call execute method, the number of threads keeps increasing. Old threads are not reused, causing the thread queue to fill up.

Principles of thread reclamation in thread pools

The running state of the thread pool is not explicitly set by the user, but is maintained internally along with the running of the thread pool. A variable is used internally to maintain two values: runState and number of threads (workerCount). In the implementation, the thread pool combines the maintenance of two key parameters, runState and workerCount, as shown in the following code:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int runStateOf(int c)     { return c & ~CAPACITY; } // Calculate the current running status
private static int workerCountOf(int c)  { return c & CAPACITY; }  // Count the number of current threads
private static int ctlOf(int rs, int wc) { return rs | wc; }   // Generate CTLS from state and thread count
Copy the code

CTL this AtomicInteger is a field that controls the running state of the thread pool and the number of valid threads in the pool. The runState of the thread pool and the number of valid threads in the thread pool (workerCount). The runState is stored in the higher 3 bits and the workerCount is stored in the lower 29 bits. The two variables do not interfere with each other. Using a variable to store two values can avoid inconsistencies when making relevant decisions. It is unnecessary to occupy lock resources to maintain the consistency of the two values. As you can also see from reading the thread pool source code, it is often necessary to determine both the running state of the thread pool and the number of threads. Thread pools also provide several methods for the user to obtain the current running state of the thread pool and the number of threads. All of these are bit operations, which are much faster than basic operations.

Thread pools need to manage the life cycle of threads and need to be recycled when threads are not running for a long time. Thread pools use a Hash table to hold references to threads, which can control the thread’s life cycle by adding and removing references. What matters is how to tell if the thread is running.

Worker inherits AQS and uses AQS to realize the function of exclusive lock. Instead of ReentrantLock, AQS is used to reflect the thread’s current execution state.

  1. Once the lock method acquires an exclusive lock, it indicates that the current thread is executing a task. 2. If a task is being executed, the thread should not be interrupted. 3. If the thread is not in the exclusive lock state, that is, in the idle state, it is not processing tasks. In this case, you can interrupt the thread. 4. The thread pool calls interruptIdleWorkers to interrupt idle threads when the shutdown or tryTerminate methods are executed. The interruptIdleWorkers method uses tryLock to determine whether threads in the thread pool are idle. If the thread is idle, it can be safely reclaimed.

This feature is used in the thread collection process, as shown in the following figure:

The thread pool’s job is to maintain a certain number of thread references based on the current state of the thread pool and prevent these threads from being reclaimed by the JVM. When the thread pool decides which threads need to be reclaimed, it simply removes the references. After Worker is created, it will poll continuously and then acquire tasks for execution. Core threads can wait indefinitely to acquire tasks, while non-core threads have to acquire tasks within a limited time. When the Worker fails to obtain the task, that is, the acquired task is empty, the loop will end and the Worker will actively eliminate its own reference in the thread pool.

At this point, you can see that threads in the thread pool that have completed work are not released immediately as the task ends. For example, the core thread is 20 and the maximum thread is 40. When the thread pool ballooned to 40. There will be a delay. A batch of threads is recycled before processing continues. In this case, it is suspected that the execute submission speed of multiple threads is too fast, causing the number of threads to increase continuously.

The tests found:

  • New threads are created for batch submission tasks. Even though the number of threads committed per batch is much smaller than the number of core threads. In addition, new threads are constantly generated after 5S interval of each batch submission.

  • A batch of tasks is submitted in a loop. If each submission is delayed by 0.1ns, no new threads will be generated.

At this point, it can be determined that the commit speed is related to the allocation of new threads and reuse of old threads from the thread pool.

Solution Attempt 1:

Cancel the countDownLatch starting gun synergy mechanism and use the thread pool activeCount to check whether the task is complete. Thread pool has its own activeCount method, which is used to detect the number of active thread pools. When activeCount is 0, the thread pool is considered to be fully released, which is equivalent to countDownLatch.

Source answering questions

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState  and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get();
        // workerCountOf(c) The number of working threads in the thread pool is smaller than the number of core threads
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // isRunning(c) indicates whether executor isRunning
        // The workQueue is a running queue, the offer operation is to join the queue, if failed to join the queue, return false!
        // returning false creates a new thread.
        // The thread pool production-consumption model is implemented with the help of the workQueue queue. The consumption speed is less than the queue entry speed due to locks and CAS. Queues grow and eventually fail to get in.
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null.false);
        }
        else if(! addWorker(command,false))
            reject(command);
    }
Copy the code

At this point, the re-test found that the thread pool is still increasing threads, not reuse the old thread. The activeCount has nothing to do with the JVM’s actual thread release.

Solution Attempt 2:

Thread pool queue model

ThreadPoolExecutor performs the execute method in four different ways. 1) If there are fewer threads currently running than corePoolSize, a new thread is created to perform the task (note that this step requires a global lock). 2) Add the task to BlockingQueue if the number of running threads is equal to or greater than corePoolSize. 3) If the task cannot be added to the BlockingQueue (the queue is full), a new thread is created to process the task (note that this step requires a global lock). 4) if you create a new thread will make the currently running thread beyond maximumPoolSize, task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method.

The overall design idea for ThreadPoolExecutor to take these steps is to avoid acquiring global locks as much as possible when executing the execute() method (which would be a serious scalability bottleneck). After ThreadPoolExecutor has warmed up (the number of threads currently running is greater than or equal to corePoolSize), almost all execute() method calls execute Step 2, which does not require a global lock. The thread pool is a producer-consumer model. Production and consumption are based on the queue model set by the thread pool. RunnableTaskQueue: A blocking queue that holds tasks waiting to be executed. You can choose from the following blocking queues: ❑ ArrayBlockingQueue: is a bounded blocking queue based on an array structure that sorts elements in a FIFO (first-in, first-out) manner. ❑ LinkedBlockingQueue: Block queue based on a list. Maximum length can be specified, but is unbounded by default. This queue sorts elements by FIFO and typically has a higher throughput than ArrayBlockingQueue. Static factory methods Executors. NewFixedThreadPool () using the queue. ❑ SynchronousQueue: a synchronous blocking queue with no actual storage space. Each insert operation must wait until another thread calls to remove operation, otherwise the insert has been in the blocking state, the throughput is usually more than the Linked – BlockingQueue, static factory methods Executors. NewCachedThreadPool using the queue. ❑ PriorityBlockingQueue: Heap-based unbounded blocking priority queue. \

In lenovo’s previous business process, Kafka consumes 100 forwarded messages per batch, and the thread pool is set to 100. The first batch is successfully forwarded, then the second batch is consumed, but the core threads are not all released at this point, and the message is added to the waiting queue. Hence Caton. Consider a scenario where there is a lag. If the wait queue is lowered, new threads will be created until the maxSize thread count is exceeded and an exception will be thrown. If the total number of threads under control is always greater than the number of tasks to be executed, new threads are constantly created and old threads are destroyed as keepAlive expires. Consider changing the thread pool queue model, where the SynchronousQueue has no actual storage space and must wait for the previous task to be consumed before the next task is queued. At this point, the core thread will constantly consume tasks, and there will be no queue full of new threads.

Thread pool performance test records under various parameters

  1. The thread pool commits hibernation for 1ns, using the starting gun to coordinate.

No new threads are created in the thread pool, and requests per minute remain steady at around 6.6K.

  1. Thread pool countDownlatch, which does not sleep.

The thread pool will be continuously added, and queues may be discharged into the thread pool. The velocity is not stable. It keeps going down.

  1. The thread pool uses synchronous queues, consuming a batch of 250

It stabilized at about 13.2K.

  1. The thread pool consumes a batch of 10 using synchronous queues

Stable at 17K, CPU utilization at 73%

  1. The thread pool uses synchronous queues, consuming a batch of 50

Stable at 7K and high CPU usage. Position at 85%

  1. The thread pool uses synchronous queues, consuming a batch of 8

High fluctuation, but high performance, average 22K. It is assumed that the push queue is greatly affected by burrs and the CPU usage is low. 50%

  1. The thread pool consumes a batch of 4 using synchronous queues

High performance, average 25K. It is speculated that push queue is greatly affected by burrs and has a low CPU usage of 28%. It can be seen that for IO intensive tasks, the increase of CPU does not increase the advantage.

Summary of performance optimization points

  1. Optimized Kafka consumption mode, using batch consumption Kafka to improve concurrency.
  2. Use multithreading in conjunction with HTTPClient PollManager to improve the forwarding rate concurrently.
  3. Optimizes thread pool task queue mode from LinkList to SynchronousQueue. Threads are not created again.

Next focus

You can see that even with optimized thread pools, HttpClient’s forwarding performance is around 400 entries per second. The performance is not high, so the next step is to use Netty/Vertx to override the forwarding thread.