The Bulk API can contain multiple index operations, such as updating or deleting indexes, in a single API call. The detailed API is as follows:

  • public final BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException
  • public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener)

1. Explain BulkRequest



Let’s take a look at the core attributes and typical methods:

  • Final List requests = new ArrayList<>() final List requests = new ArrayList<>() DocWriteRequest subclasses include IndexRequest, UpdateRequest, DeleteRequest.
  • Private Final Set indices = new HashSet<>() : List requests index List payloads = null List payloads = null List payloads = null
  • Protected TimeValue timeout = BulkShardrequest. DEFAULT_TIMEOUT: The timeout mechanism takes effect for a Bulk request.
  • Private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT: WaitForActiveShards, which applies to one Bulk request, with waitForActiveShards taking precedence in each request.
  • Private RefreshPolicy RefreshPolicy = RefreshPolicy.NONE: RefreshPolicy.
  • Private Long sizeInBytes = 0: Indicates the size of the entire Bulk request.

Add a request to BulkRequest through the Add API.

The Bulk API request format is as follows:

POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
Copy the code

The request format is defined as follows (restfull) :

  • The content-type of the POST request is Application/X-ndjson.
  • Each command takes two lines, and the end character of each line is \r\n.
  • The first behavior is metadata. “opType” : {metadata}.
  • The second action is a valid carrier (optional), such as the Index operation, whose payload is the IndexRequest#source field.
  • OpType The value can be index, create, update, or delete.

Public metadata (index, create, update, and delete) are as follows: 1) _index: index name 2) _type: type name 3) _id: document ID 4) Routing: route value 5) parent 6) version: Data version number 7) version_type: version type Each specific metadata 1 operation, the index | create 1) pipeline 2, 1) update retry_on_conflict: update retry count when conflict. 2) _source: field filtering. Payload 1, index | create its payload is _source fields. Update payloads are partial Doc, upsert and script. 3. Delete has no payload. The main reason why the request format is designed as metdata+ effective carrier is that at the receiving end node (the so-called receiving node is the first node that receives the command), only the metadata needs to be parses, and then the request is directly forwarded to the corresponding data node.

3.1 Versioning Each BULK entry has its own version, which exists in the metadata of the item requested.

3.2 Routing Each Bulk entry takes effect separately.

3.3 Wait ForActiveShards BulkRequest#waitForActiveShards can be set to require the minimum number of copies to be Active before Bulk execution.

4, the Bulk of the Demo

public static final void testBulk() { RestHighLevelClient client = EsClient.getClient(); try { IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12") .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk")); UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11") .doc(new IndexRequest("twitter", "_doc", "11") .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update"))); BulkRequest request = new BulkRequest(); request.add(indexRequest); request.add(updateRequest); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); System.out.println(failure); continue; } DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; System.out.println(indexRequest); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; System.out.println(updateRequest); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; System.out.println(deleteResponse); } } } catch (Exception e) { e.printStackTrace(); } finally { EsClient.close(client); }}Copy the code

So much for bulk API updates.


See article such as surface, I am Weige, keen on systematic analysis of JAVA mainstream middleware, pay attention to the public number “middleware interest circle”, replycolumnCan get into the system column navigation, replydataYou can get the author’s learning mind map.