background
At present, the home page recommendation and other user matching functions in the company’s social APP [Nuanchat] APP are realized based on ES algorithm, so it is necessary to synchronize user data to ES in real time. The operation of ES in the project is based on the jPA framework to directly connect the operation of ES. When the user information is changed, the user data is synchronized to ES in real time through the SAVE method of JPA. The consequence of this is that ES updates frequently. In the peak period of active users, THE IO and CPU of ES server are under great pressure, and even directly affect the normal use of online functions. Therefore, the adjustment and optimization of synchronous ES is imminent.
The status quo
The es cluster configuration of the current project: 2 cores, 4G and 2 nodes, 4W + online daily activity, ES server CPU utilization rate is above 90% during user active period. When the company successfully directs traffic through advertising or various marketing methods, there will be a small traffic peak on the line, and THE CPU usage of ES server may even reach 100%. As a result, any ES request in the project will be timed out and the service will be unavailable for a short period of time.
Optimization scheme
The optimization scheme must satisfy two conditions
- Solve the online ES service unavailability problem in a short time
- The es synchronization mode was thoroughly optimized to fundamentally relieve the pressure on THE ES server
Based on this, we made two steps of optimization, server upgrade and the introduction of Flink + Kafka + ES streaming processing
-
Improving hardware Configuration
In order to change the status quo of unavailability of online services in time, we upgraded the ES server configuration from 2-core 4G 2-node to 8-core 16G 3-node, so as to improve the concurrent processing capacity of ES service from the hardware level. After the improvement of hardware level, the online feedback is obvious. Based on the growth of current applications, the service unavailability risk caused by ES can be guaranteed in a short period of time.
Performance of ES cluster after upgrading:
-
Based on Kafka -> Flink -> ES, realize the real-time synchronization of user data es
- Why choose Kafka -> Flink -> ES streaming processing
To solve the ES server stress problem completely, we must replace the way we operate es directly in the project based on JPA. The new alternative must meet at least two conditions:
- Fault tolerance: If a process fails, be able to recover and start processing again from where it left off;
- High performance: delay should be as small as possible, throughput should be as large as possible (batch operation of ES can effectively improve ES performance);
Flink checkpoint can combine Kafka offset well to ensure fault-tolerant recovery and state recovery when the program starts.
In Ali, the Flink cluster can process 1.7 billion data volumes per second, or trillions of data pieces a day. Flink integrates ES to provide batch processing, which can greatly reduce the number of visits to ES.
Synchronizing es with Flink Sink also has the advantage of supporting incremental updates. We just need to put the attribute fields involved into a map, and only the fields we specify will be updated when we synchronize es, without worrying about overwriting the other fields.
- The optimization process
Step 1: Update the user information entry in the unified project and decouple the business from the ES operation through Kafka messages.
Step 2: Access the Flink stream processing framework. It can perform distributed computation on the data in memory and then output the data to the external system ES through sink.
Flink: 1.14 Kafka: 2.0 es: 7.12
Pom file
<! -- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.14. 0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_211.</artifactId>
<version>1.14. 0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14. 0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_211.</artifactId>
<version>1.14. 0</version> </dependency> <! -- kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_211.</artifactId>
<version>1.14. 0</version> </dependency> <! -- es --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_211.</artifactId>
<version>1.14. 0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.271.</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.188.</version>
</dependency>
Copy the code
The Flink main process reads the target data from Kafka and writes es after processing
// Get all parameters
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
// Prepare the environment
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
// Read data from kafka
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);
// Read the es address from the configuration file
List<HttpHost> esAddresses = ESSinkUtil.getEsAddresses(parameterTool.get(PropertiesConstants.ELASTICSEARCH_HOSTS));
// Read bulk Flush size from the configuration file, representing the number of batches processed at a time
int bulkSize = parameterTool.getInt(PropertiesConstants.ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40);
// Read the number of parallel sinks from the configuration file
int sinkParallelism = parameterTool.getInt(PropertiesConstants.STREAM_SINK_PARALLELISM, 5);
ESSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data,
(MetricEvent event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> {
logger.info("======================== model {}", JSON.toJSONString(event));
if (null! = event) { ModelHandler handler = HandlerFactory.getHandler(event.getHandlerType(), event.getIndex()); EsTypeEnum type = EsTypeEnum.findByCode(event.getEsType());if (null == handler || null == type) {
logger.error("No processor or operation type specified");
return;
}
try {
switch (type) {
case ADD:
requestIndexer.add(Requests.indexRequest(event.getIndex()).id(event.getIndexId())
.source(handler.doHandle(event.getFields())));
break;
case UPDATE:
requestIndexer.add(new UpdateRequest(event.getIndex(), event.getIndexId()).
doc(handler.doHandle(event.getFields())));
break;
case DELETE:
requestIndexer.add(Requests.deleteRequest(event.getIndex()).id(event.getIndexId()));
break;
default:
logger.error("Currently unsupported operation types"); }}catch (ElasticsearchGenerationException e) {
logger.error("Data synchronization to ES exception", e);
}
}
},
parameterTool);
env.execute("flink sync es");
Copy the code
Build the Kafka DataStreamSource to parse the target data from the Kafka message
public static DataStreamSource<MetricEvent> buildSource(StreamExecutionEnvironment env){
ParameterTool parameter = (ParameterTool) env.getConfig().getGlobalJobParameters();
ParameterTool parameterTool = (ParameterTool) env.getConfig().getGlobalJobParameters();
KafkaSource<MetricEvent> source = KafkaSource.<MetricEvent>builder()
// Kafka cluster address
.setBootstrapServers(parameterTool.getRequired(PropertiesConstants.KAFKA_BROKERS))
// kafka topic
.setTopics(parameter.getRequired(PropertiesConstants.METRICS_TOPIC))
// kafka group id
.setGroupId(parameterTool.getRequired(PropertiesConstants.KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.latest())
// Deserialize the target object
.setValueOnlyDeserializer(new MetricSchema())
.build();
return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_NAME);
Copy the code
Kafka message body data structure
public class MetricEvent {
/** Index name (mandatory) */
private String index;
/** Index ID (mandatory) */
private String indexId;
/** Processor type HandlerTypeEnum (custom processing) */
private String handlerType;
/** Processing type EsTypeEnum (Mandatory, add/Delete/change) */
private String esType;
/** timestamp Message sending time (optional) */
private Long timestamp;
/** Specific content (mandatory) */
private Map<String, Object> fields;
}
Copy the code
Es configuration requires special attention. Batch submission can greatly reduce the number of ES requests. However, it must be considered that if Kafka messages do not reach the maximum number of batch commits during low user activity periods, this will cause user data to fail to synchronize ES in a timely manner.
Flink ElasticsearchSink provides two batch commit modes, by data volume and by maximum interval between commit.
public static void addSink(ParameterTool parameterTool, SingleOutputStreamOperator<EsCommonModel> data, ElasticsearchSinkFunction<EsCommonModel> func) throws MalformedURLException {
// Read the es address from the configuration file
List<HttpHost> esAddresses = getEsAddresses(parameterTool);
// Read bulk Flush size from the configuration file, representing the number of batches processed at a time
int bulkSize = parameterTool.getInt(PropertiesConstants.ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, DEFAULT_ELASTICSEARCH_BULK_SIZE);
// Read bulk Flush Interval from the configuration file, representing the maximum interval for batch processing
int bulkInterval = parameterTool.getInt(PropertiesConstants.ELASTICSEARCH_BULK_FLUSH_MAX_INTERVAL, DEFAULT_ELASTICSEARCH_BULK_INTERVAL);
// Read the number of parallel sinks from the configuration file
int sinkParallelism = parameterTool.getInt(PropertiesConstants.ELASTICSEARCH_STREAM_SINK_PARALLELISM, DEFAULT_ELASTICSEARCH_SINK_PARALLELISM);
ElasticsearchSink.Builder<EsCommonModel> esSinkBuilder = new ElasticsearchSink.Builder<>(esAddresses, func);
esSinkBuilder.setBulkFlushMaxActions(bulkSize);
esSinkBuilder.setBulkFlushInterval(bulkInterval);
esSinkBuilder.setFailureHandler(new RetryRequestFailureHandler());
data.addSink(esSinkBuilder.build()).setParallelism(sinkParallelism);
}
Copy the code
Configuration Optimization Suggestions
The number of Kafka partitions determines the parallelism of Flink. Preferably, the number of Kafka partitions is the same as the parallelism of Flink, and the parallelism of Flink is the same as the number of ES partitions, so that parallel writing can be done.
Performance improvement
After the launch of Flink platform, ES service remained stable and CPU usage did not spike due to the sudden increase of system flow.
Direction of subsequent optimization
Currently, projects actively send Kafka messages when user information changes, which in itself is an intrusion into business code. The realization of user data synchronization ES based on Flink CDC can achieve the purpose of complete decoupling between services and synchronization ES. Those of you who are interested can find out for yourself what a Flink CDC is. After the optimization of the subsequent project is completed, we will share how to realize user data synchronization ES based on Flink CDC.
Author’s brief introduction
Dongming, from Hangzhou Xiaoyu Technology, technology center back-end engineer.
| this paper XiaoYu production technology team, copyright ownership XiaoYu technology team. Welcome to reprint or use the content of this article for non-commercial purposes such as sharing and communication, please mark “content reprinted from Xiaoyu Technology and Technology team”. This article shall not be reproduced or used commercially without permission.