The target

ElasticSearch Connector (Flink) is used to process Kafka data and store it in ElasticSearch.


The version of ES can be configured according to the version I actually use. Here I use ES7 online.

Kafka is configured according to the version of Scala it uses;


ES configuration

Address resolution

You can encapsulate your own address resolution method

public static HttpHost[] loadHostArray(String nodes) {
    if (httpHostArray == null) {
        String[] split = nodes.split(",");
        httpHostArray = new HttpHost[split.length];

        for(int i = 0; i < split.length; ++i) {
            String item = split[i];
            httpHostArray[i] = new HttpHost(item.split(":") [0], Integer.parseInt(item.split(":") [1]), "http"); }}return httpHostArray;
If it is just a test, it can be as follows:

List<HttpHost> elsearchHosts = new ArrayList<>();
elsearchHosts.add(new HttpHost("".9200."http"));
Create ElasticsearchSink. Builder

Configuration details:

  • esSinkBuilder.setBulkFlushInterval(3000); // Batch write interval. If set, ignore the following two batch write configurations
  • esSinkBuilder.setBulkFlushMaxSizeMb(10); // The maximum amount of data to be written in batches
  • esSinkBuilder.setBulkFlushMaxActions(1); // The maximum number of writes in a batch
  • esSinkBuilder.setBulkFlushBackoff(true); // Whether to enable retry
  • esSinkBuilder.setBulkFlushBackoffRetries(2); // Number of failed retries

Configure a failure policy:

  • esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); // Default failure retry
  • esSinkBuilder.setFailureHandler; // Rewrite the failed policy. Write to the es disk if it is full
ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(elsearchHosts, new ElasticsearchSinkFunction<JSONObject>() {
    public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        new ElasticsearchSinkFunction<JSONObject>() {

            private String INDEX = "test";

            public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

                // If it does not already exist, part of the document must be used as a upsert document
                UpdateRequest updateRequest = new UpdateRequest(INDEX, jsonObject.getString("id"));
                updateRequest.upsert(jsonObject, XContentType.JSON);
                updateRequest.doc(jsonObject, XContentType.JSON);
                // Add the requestrequestIndexer.add(updateRequest); }}; }}); esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
    public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
        if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
            // full queue; re-add document for indexing
        } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
            // malformed document; simply drop request without failing sink
        } else {
            // for all other failures, fail the sink
            // here the failure is simply rethrown, but users can also choose to throw custom exceptions
            //throw failure
            //logger.error(throwable.getMessage());}}}); esSinkBuilder.setBulkFlushInterval(3000);
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
Kafka configuration


Properties properties=new Properties();
Data is read

Custom Schema

BinLogPojo can be replaced with its own entity class

public class BinlogSchema implements DeserializationSchema<BinLogPojo> {

    public BinLogPojo deserialize(byte[] bytes) {
        String log = new String(bytes, StandardCharsets.UTF_8);
        return JSON.parseObject(log, BinLogPojo.class);

    public boolean isEndOfStream(BinLogPojo binLogPojo) {
        return false;

    public TypeInformation<BinLogPojo> getProducedType(a) {
        return TypeInformation.of(new TypeHint<BinLogPojo>(){
            public TypeInformation<BinLogPojo> getTypeInfo(a) {
                return super.getTypeInfo(); }}); }}Copy the code

Consumption kafka

There is a note, is not directly returns a JSONObject flow, so the sink will be submitted to the Java. Lang. UnsupportedOperationException errors, so suggest using String or own entity class encapsulates a serialized

FlinkKafkaConsumerBase<BinLogPojo> eventConsumer = new FlinkKafkaConsumer<>(
        TOPIC, new BinlogSchema(), properties)
SingleOutputStreamOperator<JSONObject> mapStream = env.addSource(eventConsumer)
        .map(new MapFunction<BinLogPojo, JSONObject>() {
            public JSONObject map(BinLogPojo binLog) throws Exception {
                LinkedHashMap<String, String> binLogData = binLog.getData();
                JSONObject jsonObject = new JSONObject();
                returnjsonObject; }});Copy the code