“This is the third day of my participation in the November Gwen Challenge. See details of the event: The last Gwen Challenge 2021”.
The target
ElasticSearch Connector (Flink) is used to process Kafka data and store it in ElasticSearch.
pom
The version of ES can be configured according to the version I actually use. Here I use ES7 online.
Kafka is configured according to the version of Scala it uses;
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_211.</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_211.</artifactId>
<version>${flink.version}</version>
</dependency>
Copy the code
ES configuration
Address resolution
You can encapsulate your own address resolution method
public static HttpHost[] loadHostArray(String nodes) {
if (httpHostArray == null) {
String[] split = nodes.split(",");
httpHostArray = new HttpHost[split.length];
for(int i = 0; i < split.length; ++i) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":") [0], Integer.parseInt(item.split(":") [1]), "http"); }}return httpHostArray;
}
Copy the code
If it is just a test, it can be as follows:
List<HttpHost> elsearchHosts = new ArrayList<>();
elsearchHosts.add(new HttpHost("127.0.0.1".9200."http"));
Copy the code
Create ElasticsearchSink. Builder
Configuration details:
- esSinkBuilder.setBulkFlushInterval(3000); // Batch write interval. If set, ignore the following two batch write configurations
- esSinkBuilder.setBulkFlushMaxSizeMb(10); // The maximum amount of data to be written in batches
- esSinkBuilder.setBulkFlushMaxActions(1); // The maximum number of writes in a batch
- esSinkBuilder.setBulkFlushBackoff(true); // Whether to enable retry
- esSinkBuilder.setBulkFlushBackoffRetries(2); // Number of failed retries
Configure a failure policy:
- esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); // Default failure retry
- esSinkBuilder.setFailureHandler; // Rewrite the failed policy. Write to the es disk if it is full
ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(elsearchHosts, new ElasticsearchSinkFunction<JSONObject>() {
@Override
public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
new ElasticsearchSinkFunction<JSONObject>() {
private String INDEX = "test";
@Override
public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
// If it does not already exist, part of the document must be used as a upsert document
UpdateRequest updateRequest = new UpdateRequest(INDEX, jsonObject.getString("id"));
updateRequest.upsert(jsonObject, XContentType.JSON);
updateRequest.doc(jsonObject, XContentType.JSON);
// Add the requestrequestIndexer.add(updateRequest); }}; }}); esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
@Override
public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
requestIndexer.add(actionRequest);
} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
//throw failure
//logger.error(throwable.getMessage());}}}); esSinkBuilder.setBulkFlushInterval(3000);
esSinkBuilder.setBulkFlushMaxSizeMb(10);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setBulkFlushBackoffRetries(2);
esSinkBuilder.setBulkFlushMaxActions(1);
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
Copy the code
sink
mapStream.addSink(esSinkBuilder.build()).name("sink");
Copy the code
Kafka configuration
parameter
Properties properties=new Properties();
properties.setProperty("bootstrap.servers"."127.0.0.1:9092");
properties.setProperty("group.id"."test");
properties.setProperty("auto.offset.reset"."latest");
properties.setProperty("flink.partition-discovery.interval-millis"."5000");
properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("enable.auto.commit"."true");
properties.setProperty("auto.commit.interval.ms"."5000");
Copy the code
Data is read
Custom Schema
BinLogPojo can be replaced with its own entity class
public class BinlogSchema implements DeserializationSchema<BinLogPojo> {
@Override
public BinLogPojo deserialize(byte[] bytes) {
String log = new String(bytes, StandardCharsets.UTF_8);
return JSON.parseObject(log, BinLogPojo.class);
}
@Override
public boolean isEndOfStream(BinLogPojo binLogPojo) {
return false;
}
@Override
public TypeInformation<BinLogPojo> getProducedType(a) {
return TypeInformation.of(new TypeHint<BinLogPojo>(){
@Override
public TypeInformation<BinLogPojo> getTypeInfo(a) {
return super.getTypeInfo(); }}); }}Copy the code
Consumption kafka
There is a note, is not directly returns a JSONObject flow, so the sink will be submitted to the Java. Lang. UnsupportedOperationException errors, so suggest using String or own entity class encapsulates a serialized
FlinkKafkaConsumerBase<BinLogPojo> eventConsumer = new FlinkKafkaConsumer<>(
TOPIC, new BinlogSchema(), properties)
.setStartFromLatest();
SingleOutputStreamOperator<JSONObject> mapStream = env.addSource(eventConsumer)
.map(new MapFunction<BinLogPojo, JSONObject>() {
@Override
public JSONObject map(BinLogPojo binLog) throws Exception {
LinkedHashMap<String, String> binLogData = binLog.getData();
JSONObject jsonObject = new JSONObject();
jsonObject.put("status".0);
returnjsonObject; }});Copy the code