“This is the third day of my participation in the November Gwen Challenge. See details of the event: The last Gwen Challenge 2021”.

The target

ElasticSearch Connector (Flink) is used to process Kafka data and store it in ElasticSearch.

pom

The version of ES can be configured according to the version I actually use. Here I use ES7 online.

Kafka is configured according to the version of Scala it uses;

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka_211.</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_211.</artifactId>
    <version>${flink.version}</version>
</dependency>
Copy the code

ES configuration

Address resolution

You can encapsulate your own address resolution method

public static HttpHost[] loadHostArray(String nodes) {
    if (httpHostArray == null) {
        String[] split = nodes.split(",");
        httpHostArray = new HttpHost[split.length];

        for(int i = 0; i < split.length; ++i) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":") [0], Integer.parseInt(item.split(":") [1]), "http"); }}return httpHostArray;
}
Copy the code

If it is just a test, it can be as follows:

List<HttpHost> elsearchHosts = new ArrayList<>();
elsearchHosts.add(new HttpHost("127.0.0.1".9200."http"));
Copy the code

Create ElasticsearchSink. Builder

Configuration details:

  • esSinkBuilder.setBulkFlushInterval(3000); // Batch write interval. If set, ignore the following two batch write configurations
  • esSinkBuilder.setBulkFlushMaxSizeMb(10); // The maximum amount of data to be written in batches
  • esSinkBuilder.setBulkFlushMaxActions(1); // The maximum number of writes in a batch
  • esSinkBuilder.setBulkFlushBackoff(true); // Whether to enable retry
  • esSinkBuilder.setBulkFlushBackoffRetries(2); // Number of failed retries

Configure a failure policy:

  • esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); // Default failure retry
  • esSinkBuilder.setFailureHandler; // Rewrite the failed policy. Write to the es disk if it is full
ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(elsearchHosts, new ElasticsearchSinkFunction<JSONObject>() {
    @Override
    public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        new ElasticsearchSinkFunction<JSONObject>() {

            private String INDEX = "test";

            @Override
            public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

                // If it does not already exist, part of the document must be used as a upsert document
                UpdateRequest updateRequest = new UpdateRequest(INDEX, jsonObject.getString("id"));
                updateRequest.upsert(jsonObject, XContentType.JSON);
                updateRequest.doc(jsonObject, XContentType.JSON);
                // Add the requestrequestIndexer.add(updateRequest); }}; }}); esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
    @Override
    public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
        if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
            // full queue; re-add document for indexing
            requestIndexer.add(actionRequest);
        } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
            // malformed document; simply drop request without failing sink
        } else {
            // for all other failures, fail the sink
            // here the failure is simply rethrown, but users can also choose to throw custom exceptions
            //throw failure
            //logger.error(throwable.getMessage());}}}); esSinkBuilder.setBulkFlushInterval(3000);
esSinkBuilder.setBulkFlushMaxSizeMb(10);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setBulkFlushBackoffRetries(2);
esSinkBuilder.setBulkFlushMaxActions(1);
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
Copy the code

sink

mapStream.addSink(esSinkBuilder.build()).name("sink");
Copy the code

Kafka configuration

parameter

Properties properties=new Properties();
properties.setProperty("bootstrap.servers"."127.0.0.1:9092");
properties.setProperty("group.id"."test");
properties.setProperty("auto.offset.reset"."latest");
properties.setProperty("flink.partition-discovery.interval-millis"."5000");
properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("enable.auto.commit"."true");
properties.setProperty("auto.commit.interval.ms"."5000");
Copy the code

Data is read

Custom Schema

BinLogPojo can be replaced with its own entity class

public class BinlogSchema implements DeserializationSchema<BinLogPojo> {

    @Override
    public BinLogPojo deserialize(byte[] bytes) {
        String log = new String(bytes, StandardCharsets.UTF_8);
        return JSON.parseObject(log, BinLogPojo.class);
    }

    @Override
    public boolean isEndOfStream(BinLogPojo binLogPojo) {
        return false;
    }

    @Override
    public TypeInformation<BinLogPojo> getProducedType(a) {
        return TypeInformation.of(new TypeHint<BinLogPojo>(){
            @Override
            public TypeInformation<BinLogPojo> getTypeInfo(a) {
                return super.getTypeInfo(); }}); }}Copy the code

Consumption kafka

There is a note, is not directly returns a JSONObject flow, so the sink will be submitted to the Java. Lang. UnsupportedOperationException errors, so suggest using String or own entity class encapsulates a serialized

FlinkKafkaConsumerBase<BinLogPojo> eventConsumer = new FlinkKafkaConsumer<>(
        TOPIC, new BinlogSchema(), properties)
        .setStartFromLatest();
SingleOutputStreamOperator<JSONObject> mapStream = env.addSource(eventConsumer)
        .map(new MapFunction<BinLogPojo, JSONObject>() {
            @Override
            public JSONObject map(BinLogPojo binLog) throws Exception {
                LinkedHashMap<String, String> binLogData = binLog.getData();
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("status".0);
                returnjsonObject; }});Copy the code