Skip to content

Commit 303e408

Browse files
committed
fix for bug: #3640
Signed-off-by: Jitendra Kumar <[email protected]>
1 parent 65a590c commit 303e408

File tree

3 files changed

+93
-19
lines changed

3 files changed

+93
-19
lines changed

client/rest/src/main/java/org/opensearch/client/RestClient.java

+73-12
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public class RestClient implements Closeable {
131131
private volatile NodeTuple<List<Node>> nodeTuple;
132132
private final WarningsHandler warningsHandler;
133133
private final boolean compressionEnabled;
134+
private final boolean chunkedTransferEncodingEnabled;
134135

135136
RestClient(
136137
CloseableHttpAsyncClient client,
@@ -141,6 +142,20 @@ public class RestClient implements Closeable {
141142
NodeSelector nodeSelector,
142143
boolean strictDeprecationMode,
143144
boolean compressionEnabled
145+
) {
146+
this(client, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode, compressionEnabled, true);
147+
}
148+
149+
RestClient(
150+
CloseableHttpAsyncClient client,
151+
Header[] defaultHeaders,
152+
List<Node> nodes,
153+
String pathPrefix,
154+
FailureListener failureListener,
155+
NodeSelector nodeSelector,
156+
boolean strictDeprecationMode,
157+
boolean compressionEnabled,
158+
boolean chunkedTransferEncodingEnabled
144159
) {
145160
this.client = client;
146161
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
@@ -149,6 +164,7 @@ public class RestClient implements Closeable {
149164
this.nodeSelector = nodeSelector;
150165
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
151166
this.compressionEnabled = compressionEnabled;
167+
this.chunkedTransferEncodingEnabled = chunkedTransferEncodingEnabled;
152168
setNodes(nodes);
153169
}
154170

@@ -583,36 +599,51 @@ private static void addSuppressedException(Exception suppressedException, Except
583599
}
584600
}
585601

586-
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) {
602+
private static HttpRequestBase createHttpRequest(
603+
String method,
604+
URI uri,
605+
HttpEntity entity,
606+
boolean compressionEnabled,
607+
boolean chunkedTransferEncodingEnabled
608+
) {
587609
switch (method.toUpperCase(Locale.ROOT)) {
588610
case HttpDeleteWithEntity.METHOD_NAME:
589-
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled);
611+
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
590612
case HttpGetWithEntity.METHOD_NAME:
591-
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled);
613+
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
592614
case HttpHead.METHOD_NAME:
593-
return addRequestBody(new HttpHead(uri), entity, compressionEnabled);
615+
return addRequestBody(new HttpHead(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
594616
case HttpOptions.METHOD_NAME:
595-
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled);
617+
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
596618
case HttpPatch.METHOD_NAME:
597-
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled);
619+
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
598620
case HttpPost.METHOD_NAME:
599621
HttpPost httpPost = new HttpPost(uri);
600-
addRequestBody(httpPost, entity, compressionEnabled);
622+
addRequestBody(httpPost, entity, compressionEnabled, chunkedTransferEncodingEnabled);
601623
return httpPost;
602624
case HttpPut.METHOD_NAME:
603-
return addRequestBody(new HttpPut(uri), entity, compressionEnabled);
625+
return addRequestBody(new HttpPut(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
604626
case HttpTrace.METHOD_NAME:
605-
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled);
627+
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled);
606628
default:
607629
throw new UnsupportedOperationException("http method not supported: " + method);
608630
}
609631
}
610632

611-
private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) {
633+
private static HttpRequestBase addRequestBody(
634+
HttpRequestBase httpRequest,
635+
HttpEntity entity,
636+
boolean compressionEnabled,
637+
boolean chunkedTransferEncodingEnabled
638+
) {
612639
if (entity != null) {
613640
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
614641
if (compressionEnabled) {
615-
entity = new ContentCompressingEntity(entity);
642+
if (chunkedTransferEncodingEnabled) {
643+
entity = new ContentCompressingChunkedEntity(entity);
644+
} else {
645+
entity = new ContentCompressingEntity(entity);
646+
}
616647
}
617648
((HttpEntityEnclosingRequestBase) httpRequest).setEntity(entity);
618649
} else {
@@ -782,7 +813,13 @@ private class InternalRequest {
782813
String ignoreString = params.remove("ignore");
783814
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
784815
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
785-
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
816+
this.httpRequest = createHttpRequest(
817+
request.getMethod(),
818+
uri,
819+
request.getEntity(),
820+
compressionEnabled,
821+
chunkedTransferEncodingEnabled
822+
);
786823
this.cancellable = Cancellable.fromRequest(httpRequest);
787824
setHeaders(httpRequest, request.getOptions().getHeaders());
788825
setRequestConfig(httpRequest, request.getOptions().getRequestConfig());
@@ -932,6 +969,30 @@ private static Exception extractAndWrapCause(Exception exception) {
932969
return new RuntimeException("error while performing request", exception);
933970
}
934971

972+
/**
973+
* A gzip compressing entity that also implements {@code getContent()}.
974+
*/
975+
public static class ContentCompressingChunkedEntity extends GzipCompressingEntity {
976+
977+
/**
978+
* Creates a {@link ContentCompressingChunkedEntity} instance with the provided HTTP entity.
979+
*
980+
* @param entity the HTTP entity.
981+
*/
982+
public ContentCompressingChunkedEntity(HttpEntity entity) {
983+
super(entity);
984+
}
985+
986+
@Override
987+
public InputStream getContent() throws IOException {
988+
ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024);
989+
try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) {
990+
wrappedEntity.writeTo(gzipOut);
991+
}
992+
return out.asInput();
993+
}
994+
}
995+
935996
/**
936997
* A gzip compressing entity that also implements {@code getContent()}.
937998
*/

client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public final class RestClientBuilder {
8484
private NodeSelector nodeSelector = NodeSelector.ANY;
8585
private boolean strictDeprecationMode = false;
8686
private boolean compressionEnabled = false;
87+
private boolean chunkedTransferEncodingEnabled = true;
8788

8889
/**
8990
* Creates a new builder instance and sets the hosts that the client will send requests to.
@@ -238,6 +239,16 @@ public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) {
238239
return this;
239240
}
240241

242+
/**
243+
* Whether the REST client should use Transfer-Encoding: chunked for compress requests"
244+
*
245+
* @param chunkedTransferEncodingEnabled flag for enabling Transfer-Encoding: chunked
246+
*/
247+
public RestClientBuilder setChunkedTransferEncodingEnabled(boolean chunkedTransferEncodingEnabled) {
248+
this.chunkedTransferEncodingEnabled = chunkedTransferEncodingEnabled;
249+
return this;
250+
}
251+
241252
/**
242253
* Creates a new {@link RestClient} based on the provided configuration.
243254
*/
@@ -256,7 +267,8 @@ public RestClient build() {
256267
failureListener,
257268
nodeSelector,
258269
strictDeprecationMode,
259-
compressionEnabled
270+
compressionEnabled,
271+
chunkedTransferEncodingEnabled
260272
);
261273
httpClient.start();
262274
return restClient;

client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ public void handle(HttpExchange exchange) throws IOException {
9292
}
9393
}
9494

95-
/** Read all bytes of an input stream and close it. */
95+
/**
96+
* Read all bytes of an input stream and close it.
97+
*/
9698
private static byte[] readAll(InputStream in) throws IOException {
9799
byte[] buffer = new byte[1024];
98100
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -104,15 +106,16 @@ private static byte[] readAll(InputStream in) throws IOException {
104106
return bos.toByteArray();
105107
}
106108

107-
private RestClient createClient(boolean enableCompression) {
109+
private RestClient createClient(boolean enableCompression, boolean chunkedEnabled) {
108110
InetSocketAddress address = httpServer.getAddress();
109111
return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
110112
.setCompressionEnabled(enableCompression)
113+
.setChunkedTransferEncodingEnabled(chunkedEnabled)
111114
.build();
112115
}
113116

114117
public void testCompressingClientWithContentLengthSync() throws Exception {
115-
RestClient restClient = createClient(true);
118+
RestClient restClient = createClient(true, false);
116119

117120
Request request = new Request("POST", "/");
118121
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));
@@ -129,9 +132,7 @@ public void testCompressingClientWithContentLengthSync() throws Exception {
129132

130133
public void testCompressingClientContentLengthAsync() throws Exception {
131134
InetSocketAddress address = httpServer.getAddress();
132-
RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
133-
.setCompressionEnabled(true)
134-
.build();
135+
RestClient restClient = createClient(true, false);
135136

136137
Request request = new Request("POST", "/");
137138
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));

0 commit comments

Comments
 (0)