Update By Query API

https://www.elastic.co/guide/en/elasticsearch/reference/5.5/docs-update-by-query.html#docs-update-by-query


edit

The simplest usage of _update_by_query just performs an update on every document in the index without changing the source. This is useful to pick up a new property or some other online mapping change. Here is the API:

POST twitter/_update_by_query?conflicts=proceed

That will return something like this:

{
  "took" : 147,
  "timed_out": false,
  "updated": 120,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}

_update_by_query gets a snapshot of the index when it starts and indexes what it finds using internal versioning. That means that you’ll get a version conflict if the document changes between the time when the snapshot was taken and when the index request is processed. When the versions match the document is updated and the version number is incremented.

Note

Since internal versioning does not support the value 0 as a valid version number, documents with version equal to zero cannot be updated using _update_by_query and will fail the request.

All update and query failures cause the _update_by_query to abort and are returned in the failures of the response. The updates that have been performed still stick. In other words, the process is not rolled back, only aborted. While the first failure causes the abort, all failures that are returned by the failing bulk request are returned in the failures element; therefore it’s possible for there to be quite a few failed entities.

If you want to simply count version conflicts not cause the _update_by_queryto abort you can set conflicts=proceed on the url or "conflicts": "proceed"in the request body. The first example does this because it is just trying to pick up an online mapping change and a version conflict simply means that the conflicting document was updated between the start of the _update_by_queryand the time when it attempted to update the document. This is fine because that update will have picked up the online mapping update.

Back to the API format, you can limit _update_by_query to a single type. This will only update tweet documents from the twitter index:

POST twitter/tweet/_update_by_query?conflicts=proceed

You can also limit _update_by_query using the Query DSL. This will update all documents from the twitter index for the user kimchy:

POST twitter/_update_by_query?conflicts=proceed
{
  "query": { 
    "term": {
      "user": "kimchy"
    }
  }
}

The query must be passed as a value to the query key, in the same way as the Search API. You can also use the q parameter in the same way as the search api.

So far we’ve only been updating documents without changing their source. That is genuinely useful for things like picking up new properties but it’s only half the fun. _update_by_query supports a script object to update the document. This will increment the likes field on all of kimchy’s tweets:

POST twitter/_update_by_query
{
  "script": {
    "inline": "ctx._source.likes++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}

Just as in Update API you can set ctx.op to change the operation that is executed:

noop
Set ctx.op = "noop" if your script decides that it doesn’t have to make any changes. That will cause _update_by_query to omit that document from its updates. This no operation will be reported in the noop counter in the response body.
delete
Set ctx.op = "delete" if your script decides that the document must be deleted. The deletion will be reported in the deleted counter in the response body.

Setting ctx.op to anything else is an error. Setting any other field in ctx is an error.

Note that we stopped specifying conflicts=proceed. In this case we want a version conflict to abort the process so we can handle the failure.

This API doesn’t allow you to move the documents it touches, just modify their source. This is intentional! We’ve made no provisions for removing the document from its original location.

It’s also possible to do this whole thing on multiple indexes and multiple types at once, just like the search API:

POST twitter,blog/tweet,post/_update_by_query

If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match that routing value:

POST twitter/_update_by_query?routing=1

By default _update_by_query uses scroll batches of 1000. You can change the batch size with the scroll_size URL parameter:

POST twitter/_update_by_query?scroll_size=100

_update_by_query can also use the Ingest Node feature by specifying a pipeline like this:

PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST twitter/_update_by_query?pipeline=set-foo

URL Parametersedit

In addition to the standard parameters like pretty, the Update By Query API also supports refreshwait_for_completionwait_for_active_shards, and timeout.

Sending the refresh will update all shards in the index being updated when the request completes. This is different than the Index API’s refreshparameter which causes just the shard that received the new data to be indexed.

If the request contains wait_for_completion=false then Elasticsearch will perform some preflight checks, launch the request, and then return a taskwhich can be used with Tasks APIs to cancel or get the status of the task. Elasticsearch will also create a record of this task as a document at .tasks/task/${taskId}. This is yours to keep or remove as you see fit. When you are done with it, delete it so Elasticsearch can reclaim the space it uses.

wait_for_active_shards controls how many copies of a shard must be active before proceeding with the request. See here for details. timeout controls how long each write request waits for unavailable shards to become available. Both work exactly how they work in the Bulk API.

requests_per_second can be set to any positive decimal number (1.461000, etc) and throttles rate at which _update_by_query issues batches of index operations by padding each batch with a wait time. The throttling can be disabled by setting requests_per_second to -1.

The throttling is done by waiting between batches so that scroll that_update_by_query uses internally can be given a timeout that takes into account the padding. The padding time is the difference between the batch size divided by the requests_per_second and the time spent writing. By default the batch size is 1000, so if the requests_per_second is set to 500:

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - delete_time = 2 seconds - .5 seconds = 1.5 seconds

Since the batch is issued as a single _bulk request large batch sizes will cause Elasticsearch to create many requests and then wait for a while before starting the next set. This is "bursty" instead of "smooth". The default is -1.

Response bodyedit

The JSON response looks like this:

{
  "took" : 639,
  "updated": 0,
  "batches": 1,
  "version_conflicts": 2,
  "retries": {
    "bulk": 0,
    "search": 0
  }
  "throttled_millis": 0,
  "failures" : [ ]
}
took
The number of milliseconds from start to end of the whole operation.
updated
The number of documents that were successfully updated.
batches
The number of scroll responses pulled back by the the update by query.
version_conflicts
The number of version conflicts that the update by query hit.
retries
The number of retries attempted by update-by-query. bulk is the number of bulk actions retried and search is the number of search actions retried.
throttled_millis
Number of milliseconds the request slept to conform to requests_per_second.
failures
Array of all indexing failures. If this is non-empty then the request aborted because of those failures. See conflicts for how to prevent version conflicts from aborting the operation.

Works with the Task APIedit

You can fetch the status of all running update-by-query requests with the Task API:

GET _tasks?detailed=true&actions=*byquery

The responses looks like:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            }
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

this object contains the actual status. It is just like the response json with the important addition of the total field. total is the total number of operations that the reindex expects to perform. You can estimate the progress by adding the updatedcreated, and deleted fields. The request will finish when their sum is equal to the total field.

With the task id you can look up the task directly:

GET /_tasks/taskId:1

The advantage of this API is that it integrates with wait_for_completion=falseto transparently return the status of completed tasks. If the task is completed and wait_for_completion=false was set on it them it’ll come back with aresults or an error field. The cost of this feature is the document thatwait_for_completion=false creates at .tasks/task/${taskId}. It is up to you to delete that document.

Works with the Cancel Task APIedit

Any Update By Query can be canceled using the Task Cancel API:

POST _tasks/task_id:1/_cancel

The task_id can be found using the tasks API above.

Cancellation should happen quickly but might take a few seconds. The task status API above will continue to list the task until it is wakes to cancel itself.

Rethrottlingedit

The value of requests_per_second can be changed on a running update by query using the _rethrottle API:

POST _update_by_query/task_id:1/_rethrottle?requests_per_second=-1

The task_id can be found using the tasks API above.

Just like when setting it on the _update_by_query API requests_per_second can be either -1 to disable throttling or any decimal number like 1.7 or 12 to throttle to that level. Rethrottling that speeds up the query takes effect immediately but rethrotting that slows down the query will take effect on after completing the current batch. This prevents scroll timeouts.

Manual slicingedit

Update-by-query supports Sliced Scroll allowing you to manually parallelize the process relatively easily:

POST twitter/_update_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "inline": "ctx._source['extra'] = 'test'"
  }
}
POST twitter/_update_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "inline": "ctx._source['extra'] = 'test'"
  }
}

Which you can verify works with:

GET _refresh
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

Which results in a sensible total like this one:

{
  "hits": {
    "total": 120
  }
}

Automatic slicingedit

You can also let update-by-query automatically parallelize using Sliced Scrollto slice on _uid:

POST twitter/_update_by_query?refresh&slices=5
{
  "script": {
    "inline": "ctx._source['extra'] = 'test'"
  }
}

Which you also can verify works with:

POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

Which results in a sensible total like this one:

{
  "hits": {
    "total": 120
  }
}

Adding slices to _update_by_query just automates the manual process used in the section above, creating sub-requests which means it has some quirks:

  • You can see these requests in the Tasks APIs. These sub-requests are "child" tasks of the task for the request with slices.
  • Fetching the status of the task for the request with slices only contains the status of completed slices.
  • These sub-requests are individually addressable for things like cancellation and rethrottling.
  • Rethrottling the request with slices will rethrottle the unfinished sub-request proportionally.
  • Canceling the request with slices will cancel each sub-request.
  • Due to the nature of slices each sub-request won’t get a perfectly even portion of the documents. All documents will be addressed, but some slices may be larger than others. Expect larger slices to have a more even distribution.
  • Parameters like requests_per_second and size on a request with slices are distributed proportionally to each sub-request. Combine that with the point above about distribution being uneven and you should conclude that the using size with slices might not result in exactly size documents being `_update_by_query`ed.
  • Each sub-requests gets a slightly different snapshot of the source index though these are all taken at approximately the same time.

Picking the number of slicesedit

At this point we have a few recommendations around the number of slicesto use (the max parameter in the slice API if manually parallelizing):

  • Don’t use large numbers. 500 creates fairly massive CPU thrash.
  • It is more efficient from a query performance standpoint to use some multiple of the number of shards in the source index.
  • Using exactly as many shards as are in the source index is the most efficient from a query performance standpoint.
  • Indexing performance should scale linearly across available resources with the number of slices.
  • Whether indexing or query performance dominates that process depends on lots of factors like the documents being reindexed and the cluster doing the reindexing.

Pick up a new propertyedit

Say you created an index without dynamic mapping, filled it with data, and then added a mapping value to pick up more fields from the data:

PUT test
{
  "mappings": {
    "test": {
      "dynamic": false,   
      "properties": {
        "text": {"type": "text"}
      }
    }
  }
}

POST test/test?refresh
{
  "text": "words words",
  "flag": "bar"
}
POST test/test?refresh
{
  "text": "words words",
  "flag": "foo"
}
PUT test/_mapping/test   
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

This means that new fields won’t be indexed, just stored in _source.

This updates the mapping to add the new flag field. To pick up the new field you have to reindex all documents with it.

Searching for the data won’t find anything:

POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 0
  }
}

But you can issue an _update_by_query request to pick up the new mapping:

POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 1
  }
}

You can do the exact same thing when adding a field to a multifield.

Delete API

https://www.elastic.co/guide/en/elasticsearch/reference/5.5/docs-delete.html


edit

The delete API allows to delete a typed JSON document from a specific index based on its id. The following example deletes the JSON document from an index called twitter, under a type called tweet, with id valued 1:

DELETE /twitter/tweet/1

The result of the above delete operation is:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    },
    "found" : true,
    "_index" : "twitter",
    "_type" : "tweet",
    "_id" : "1",
    "_version" : 2,
    "result": "deleted"
}

Versioningedit

Each document indexed is versioned. When deleting a document, the versioncan be specified to make sure the relevant document we are trying to delete is actually being deleted and it has not changed in the meantime. Every write operation executed on a document, deletes included, causes its version to be incremented.

Routingedit

When indexing using the ability to control the routing, in order to delete a document, the routing value should also be provided. For example:

DELETE /twitter/tweet/1?routing=kimchy

The above will delete a tweet with id 1, but will be routed based on the user. Note, issuing a delete without the correct routing, will cause the document to not be deleted.

When the _routing mapping is set as required and no routing value is specified, the delete api will throw a RoutingMissingException and reject the request.

Parentedit

The parent parameter can be set, which will basically be the same as setting the routing parameter.

Note that deleting a parent document does not automatically delete its children. One way of deleting all child documents given a parent’s id is to use the Delete By Query API to perform a index with the automatically generated (and indexed) field _parent, which is in the format parent_type#parent_id.

When deleting a child document its parent id must be specified, otherwise the delete request will be rejected and a RoutingMissingException will be thrown instead.

Automatic index creationedit

The delete operation automatically creates an index if it has not been created before (check out the create index API for manually creating an index), and also automatically creates a dynamic type mapping for the specific type if it has not been created before (check out the put mapping API for manually creating type mapping).

Distributededit

The delete operation gets hashed into a specific shard id. It then gets redirected into the primary shard within that id group, and replicated (if needed) to shard replicas within that id group.

Wait For Active Shardsedit

When making delete requests, you can set the wait_for_active_shardsparameter to require a minimum number of shard copies to be active before starting to process the delete request. See here for further details and a usage example.

Refreshedit

Control when the changes made by this request are visible to search. See ?refresh.

Timeoutedit

The primary shard assigned to perform the delete operation might not be available when the delete operation is executed. Some reasons for this might be that the primary shard is currently recovering from a store or undergoing relocation. By default, the delete operation will wait on the primary shard to become available for up to 1 minute before failing and responding with an error. The timeout parameter can be used to explicitly specify how long it waits. Here is an example of setting it to 5 minutes:

DELETE /twitter/tweet/1?timeout=5m


Get API

The get API allows to get a typed JSON document from the index based on its id. The following example gets a JSON document from an index called twitter, under a type called tweet, with id valued 0:

GET twitter/tweet/

The result of the above get operation is:

{
    "_index" : "twitter",
    "_type" : "tweet",
    "_id" : "0",
    "_version" : 1,
    "found": true,
    "_source" : {
        "user" : "kimchy",
        "date" : "2009-11-15T14:12:12",
        "likes": 0,
        "message" : "trying out Elasticsearch"
    }
}

The above result includes the _index_type_id and _version of the document we wish to retrieve, including the actual _source of the document if it could be found (as indicated by the found field in the response).

The API also allows to check for the existence of a document using HEAD, for example:

HEAD twitter/tweet/0

Realtime

By default, the get API is realtime, and is not affected by the refresh rate of the index (when data will become visible for search). If a document has been updated but is not yet refreshed, the get API will issue a refresh call in-place to make the document visible. This will also make other documents changed since the last refresh visible. In order to disable realtime GET, one can set the realtime parameter to false.

Optional Type

The get API allows for _type to be optional. Set it to _all in order to fetch the first document matching the id across all types.

Source filtering

By default, the get operation returns the contents of the _source field unless you have used the stored_fields parameter or if the _source field is disabled. You can turn off _source retrieval by using the _source parameter:

GET twitter/tweet/0?_source=false

If you only need one or two fields from the complete _source, you can use the _source_include & _source_exclude parameters to include or filter out that parts you need. This can be especially helpful with large documents where partial retrieval can save on network overhead. Both parameters take a comma separated list of fields or wildcard expressions. Example:

GET twitter/tweet/0?_source_include=*.id&_source_exclude=entities

If you only want to specify includes, you can use a shorter notation:

GET twitter/tweet/0?_source=*.id,retweeted

Stored Fields

The get operation allows specifying a set of stored fields that will be returned by passing the stored_fields parameter. If the requested fields are not stored, they will be ignored. Consider for instance the following mapping:

PUT twitter
{
   "mappings": {
      "tweet": {
         "properties": {
            "counter": {
               "type": "integer",
               "store": false
            },
            "tags": {
               "type": "keyword",
               "store": true
            }
         }
      }
   }
}

Now we can add a document:

PUT twitter/tweet/1
{
    "counter" : 1,
    "tags" : ["red"]
}
  1. and try to retrieve it:
GET twitter/tweet/1?stored_fields=tags,counter

The result of the above get operation is:

{
   "_index": "twitter",
   "_type": "tweet",
   "_id": "1",
   "_version": 1,
   "found": true,
   "fields": {
      "tags": [
         "red"
      ]
   }
}

Field values fetched from the document it self are always returned as an array. Since the counter field is not stored the get request simply ignores it when trying to get the stored_fields.

It is also possible to retrieve metadata fields like _routing and _parent fields:

PUT twitter/tweet/2?routing=user1
{
    "counter" : 1,
    "tags" : ["white"]
}
GET twitter/tweet/2?routing=user1&stored_fields=tags,counter

The result of the above get operation is:

{
   "_index": "twitter",
   "_type": "tweet",
   "_id": "2",
   "_version": 1,
   "_routing": "user1",
   "found": true,
   "fields": {
      "tags": [
         "white"
      ]
   }
}

Also only leaf fields can be returned via the stored_field option. So object fields can’t be returned and such requests will fail.

Getting the _source directly

Use the /{index}/{type}/{id}/_source endpoint to get just the _source field of the document, without any additional content around it. For example:

GET twitter/tweet/1/_source

You can also use the same source filtering parameters to control which parts of the _source will be returned:

GET twitter/tweet/1/_source?_source_include=*.id&_source_exclude=entities'

Note, there is also a HEAD variant for the _source endpoint to efficiently test for document _source existence. An existing document will not have a _source if it is disabled in the mapping.

HEAD twitter/tweet/1/_source

Routing

When indexing using the ability to control the routing, in order to get a document, the routing value should also be provided. For example:

GET twitter/tweet/2?routing=user1

The above will get a tweet with id 2, but will be routed based on the user. Note, issuing a get without the correct routing, will cause the document not to be fetched.

Preference

Controls a preference of which shard replicas to execute the get request on. By default, the operation is randomized between the shard replicas.

The preference can be set to:

_primary
The operation will go and be executed only on the primary shards.
_local
The operation will prefer to be executed on a local allocated shard if possible.
Custom (string) value
A custom value will be used to guarantee that the same shards will be used for the same custom value. This can help with "jumping values" when hitting different shards in different refresh states. A sample value can be something like the web session id, or the user name.

Refresh

The refresh parameter can be set to true in order to refresh the relevant shard before the get operation and make it searchable. Setting it to trueshould be done after careful thought and verification that this does not cause a heavy load on the system (and slows down indexing).

Distributed

The get operation gets hashed into a specific shard id. It then gets redirected to one of the replicas within that shard id and returns the result. The replicas are the primary shard and its replicas within that shard id group. This means that the more replicas we will have, the better GET scaling we will have.

Versioning support

You can use the version parameter to retrieve the document only if its current version is equal to the specified one. This behavior is the same for all version types with the exception of version type FORCE which always retrieves the document. Note that FORCE version type is deprecated.

Internally, Elasticsearch has marked the old document as deleted and added an entirely new document. The old version of the document doesn’t disappear immediately, although you won’t be able to access it. Elasticsearch cleans up deleted documents in the background as you continue to index more data.

+ Recent posts