-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reindex: Use request flavored methods #30317
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,14 @@ | |
|
||
package org.elasticsearch.index.reindex.remote; | ||
|
||
import org.apache.http.HttpEntity; | ||
import org.apache.http.entity.ByteArrayEntity; | ||
import org.apache.http.entity.ContentType; | ||
import org.apache.http.entity.StringEntity; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.search.SearchRequest; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
|
@@ -40,33 +40,27 @@ | |
import org.elasticsearch.search.sort.SortBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import static java.util.Collections.singletonMap; | ||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; | ||
|
||
/** | ||
* Builds requests for remote version of Elasticsearch. Note that unlike most of the | ||
* rest of Elasticsearch this file needs to be compatible with very old versions of | ||
* Elasticsearch. Thus is often uses identifiers for versions like {@code 2000099} | ||
* Elasticsearch. Thus it often uses identifiers for versions like {@code 2000099} | ||
* for {@code 2.0.0-alpha1}. Do not drop support for features from this file just | ||
* because the version constants have been removed. | ||
*/ | ||
final class RemoteRequestBuilders { | ||
private RemoteRequestBuilders() {} | ||
|
||
static String initialSearchPath(SearchRequest searchRequest) { | ||
static Request initialSearch(SearchRequest searchRequest, BytesReference query, Version remoteVersion) { | ||
// It is nasty to build paths with StringBuilder but we'll be careful.... | ||
StringBuilder path = new StringBuilder("/"); | ||
addIndexesOrTypes(path, "Index", searchRequest.indices()); | ||
addIndexesOrTypes(path, "Type", searchRequest.types()); | ||
path.append("_search"); | ||
return path.toString(); | ||
} | ||
Request request = new Request("POST", path.toString()); | ||
|
||
static Map<String, String> initialSearchParams(SearchRequest searchRequest, Version remoteVersion) { | ||
Map<String, String> params = new HashMap<>(); | ||
if (searchRequest.scroll() != null) { | ||
TimeValue keepAlive = searchRequest.scroll().keepAlive(); | ||
if (remoteVersion.before(Version.V_5_0_0)) { | ||
|
@@ -75,16 +69,16 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers | |
* timeout seems safer than less. */ | ||
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac())); | ||
} | ||
params.put("scroll", keepAlive.getStringRep()); | ||
request.addParameter("scroll", keepAlive.getStringRep()); | ||
} | ||
params.put("size", Integer.toString(searchRequest.source().size())); | ||
request.addParameter("size", Integer.toString(searchRequest.source().size())); | ||
if (searchRequest.source().version() == null || searchRequest.source().version() == true) { | ||
/* | ||
* Passing `null` here just add the `version` request parameter | ||
* without any value. This way of requesting the version works | ||
* for all supported versions of Elasticsearch. | ||
*/ | ||
params.put("version", null); | ||
request.addParameter("version", null); | ||
} | ||
if (searchRequest.source().sorts() != null) { | ||
boolean useScan = false; | ||
|
@@ -101,13 +95,13 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers | |
} | ||
} | ||
if (useScan) { | ||
params.put("search_type", "scan"); | ||
request.addParameter("search_type", "scan"); | ||
} else { | ||
StringBuilder sorts = new StringBuilder(sortToUri(searchRequest.source().sorts().get(0))); | ||
for (int i = 1; i < searchRequest.source().sorts().size(); i++) { | ||
sorts.append(',').append(sortToUri(searchRequest.source().sorts().get(i))); | ||
} | ||
params.put("sort", sorts.toString()); | ||
request.addParameter("sort", sorts.toString()); | ||
} | ||
} | ||
if (remoteVersion.before(Version.fromId(2000099))) { | ||
|
@@ -126,20 +120,18 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers | |
fields.append(',').append(searchRequest.source().storedFields().fieldNames().get(i)); | ||
} | ||
String storedFieldsParamName = remoteVersion.before(Version.V_5_0_0_alpha4) ? "fields" : "stored_fields"; | ||
params.put(storedFieldsParamName, fields.toString()); | ||
request.addParameter(storedFieldsParamName, fields.toString()); | ||
} | ||
return params; | ||
} | ||
|
||
static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query, Version remoteVersion) { | ||
// EMPTY is safe here because we're not calling namedObject | ||
try (XContentBuilder entity = JsonXContent.contentBuilder(); | ||
XContentParser queryParser = XContentHelper | ||
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, query)) { | ||
entity.startObject(); | ||
|
||
entity.field("query"); { | ||
/* We're intentionally a bit paranoid here - copying the query as xcontent rather than writing a raw field. We don't want | ||
/* We're intentionally a bit paranoid here - copying the query | ||
* as xcontent rather than writing a raw field. We don't want | ||
* poorly written queries to escape. Ever. */ | ||
entity.copyCurrentStructure(queryParser); | ||
XContentParser.Token shouldBeEof = queryParser.nextToken(); | ||
|
@@ -160,10 +152,11 @@ static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReferenc | |
|
||
entity.endObject(); | ||
BytesRef bytes = BytesReference.bytes(entity).toBytesRef(); | ||
return new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON); | ||
request.setEntity(new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON)); | ||
} catch (IOException e) { | ||
throw new ElasticsearchException("unexpected error building entity", e); | ||
} | ||
return request; | ||
} | ||
|
||
private static void addIndexesOrTypes(StringBuilder path, String name, String[] indicesOrTypes) { | ||
|
@@ -193,45 +186,50 @@ private static String sortToUri(SortBuilder<?> sort) { | |
throw new IllegalArgumentException("Unsupported sort [" + sort + "]"); | ||
} | ||
|
||
static String scrollPath() { | ||
return "/_search/scroll"; | ||
} | ||
static Request scroll(String scroll, TimeValue keepAlive, Version remoteVersion) { | ||
Request request = new Request("POST", "/_search/scroll"); | ||
|
||
static Map<String, String> scrollParams(TimeValue keepAlive, Version remoteVersion) { | ||
if (remoteVersion.before(Version.V_5_0_0)) { | ||
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros | ||
* so we toss out that resolution, rounding up so we shouldn't end up | ||
* with 0s. */ | ||
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac())); | ||
} | ||
return singletonMap("scroll", keepAlive.getStringRep()); | ||
} | ||
request.addParameter("scroll", keepAlive.getStringRep()); | ||
|
||
static HttpEntity scrollEntity(String scroll, Version remoteVersion) { | ||
if (remoteVersion.before(Version.fromId(2000099))) { | ||
// Versions before 2.0.0 extract the plain scroll_id from the body | ||
return new StringEntity(scroll, ContentType.TEXT_PLAIN); | ||
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN)); | ||
return request; | ||
} | ||
|
||
try (XContentBuilder entity = JsonXContent.contentBuilder()) { | ||
return new StringEntity(Strings.toString(entity.startObject() | ||
.field("scroll_id", scroll) | ||
.endObject()), ContentType.APPLICATION_JSON); | ||
entity.startObject() | ||
.field("scroll_id", scroll) | ||
.endObject(); | ||
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm. I think I'll merge as is and make a followup with those so we can talk about them! |
||
} catch (IOException e) { | ||
throw new ElasticsearchException("failed to build scroll entity", e); | ||
} | ||
return request; | ||
} | ||
|
||
static HttpEntity clearScrollEntity(String scroll, Version remoteVersion) { | ||
static Request clearScroll(String scroll, Version remoteVersion) { | ||
Request request = new Request("DELETE", "/_search/scroll"); | ||
|
||
if (remoteVersion.before(Version.fromId(2000099))) { | ||
// Versions before 2.0.0 extract the plain scroll_id from the body | ||
return new StringEntity(scroll, ContentType.TEXT_PLAIN); | ||
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN)); | ||
return request; | ||
} | ||
try (XContentBuilder entity = JsonXContent.contentBuilder()) { | ||
return new StringEntity(Strings.toString(entity.startObject() | ||
.array("scroll_id", scroll) | ||
.endObject()), ContentType.APPLICATION_JSON); | ||
entity.startObject() | ||
.array("scroll_id", scroll) | ||
.endObject(); | ||
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON)); | ||
} catch (IOException e) { | ||
throw new ElasticsearchException("failed to build clear scroll entity", e); | ||
} | ||
return request; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: should we use
NByteArrayEntity
instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dunno! I've never really into the difference.