Skip to content

Commit 45ec5ed

Browse files
committed
fix for bug: #3640
Signed-off-by: Jitendra Kumar <[email protected]>
1 parent 7005b9e commit 45ec5ed

File tree

2 files changed

+223
-0
lines changed

2 files changed

+223
-0
lines changed

client/rest/src/main/java/org/opensearch/client/RestClient.java

+35
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,41 @@ public InputStream getContent() throws IOException {
954954
}
955955
return out.asInput();
956956
}
957+
958+
/**
959+
* A gzip compressing enrity doesn't worked with chunked encoding with sigv4
960+
*
961+
* @return false
962+
*/
963+
@Override
964+
public boolean isChunked() {
965+
return false;
966+
}
967+
968+
/**
969+
* A gzip entity require to content length in http headers
970+
* as it doesn't work with chunked encoding for sigv4
971+
*
972+
* @return content lenght of gzip entity
973+
*/
974+
@Override
975+
public long getContentLength() {
976+
long size = 0;
977+
int chunk = 0;
978+
byte[] buffer = new byte[1024];
979+
980+
try {
981+
InputStream is = getContent();
982+
983+
while ((chunk = is.read(buffer)) != -1) {
984+
size += chunk;
985+
}
986+
} catch (Exception ex) {
987+
throw new RuntimeException("failed to get compressed content lenght: " + ex.getMessage());
988+
}
989+
990+
return size;
991+
}
957992
}
958993

959994
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.client;
34+
35+
import com.sun.net.httpserver.HttpExchange;
36+
import com.sun.net.httpserver.HttpHandler;
37+
import com.sun.net.httpserver.HttpServer;
38+
import org.apache.http.HttpEntity;
39+
import org.apache.http.HttpHost;
40+
import org.apache.http.entity.ContentType;
41+
import org.apache.http.entity.StringEntity;
42+
import org.junit.AfterClass;
43+
import org.junit.Assert;
44+
import org.junit.BeforeClass;
45+
46+
import java.io.ByteArrayOutputStream;
47+
import java.io.IOException;
48+
import java.io.InputStream;
49+
import java.io.OutputStream;
50+
import java.net.InetAddress;
51+
import java.net.InetSocketAddress;
52+
import java.nio.charset.StandardCharsets;
53+
import java.util.concurrent.CompletableFuture;
54+
import java.util.zip.GZIPInputStream;
55+
import java.util.zip.GZIPOutputStream;
56+
57+
public class RestClientCompressionTests extends RestClientTestCase {
58+
59+
private static HttpServer httpServer;
60+
61+
@BeforeClass
62+
public static void startHttpServer() throws Exception {
63+
httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
64+
httpServer.createContext("/", new GzipResponseHandler());
65+
httpServer.start();
66+
}
67+
68+
@AfterClass
69+
public static void stopHttpServers() throws IOException {
70+
httpServer.stop(0);
71+
httpServer = null;
72+
}
73+
74+
/**
75+
* A response handler that accepts gzip-encoded data and replies request and response encoding values
76+
* followed by the request body. The response is compressed if "Accept-Encoding" is "gzip".
77+
*/
78+
private static class GzipResponseHandler implements HttpHandler {
79+
@Override
80+
public void handle(HttpExchange exchange) throws IOException {
81+
82+
// Decode body (if any)
83+
String contentEncoding = exchange.getRequestHeaders().getFirst("Content-Encoding");
84+
String contentLength = exchange.getRequestHeaders().getFirst("Content-Length");
85+
InputStream body = exchange.getRequestBody();
86+
boolean compressedRequest = false;
87+
if ("gzip".equals(contentEncoding)) {
88+
body = new GZIPInputStream(body);
89+
compressedRequest = true;
90+
}
91+
byte[] bytes = readAll(body);
92+
boolean compress = "gzip".equals(exchange.getRequestHeaders().getFirst("Accept-Encoding"));
93+
if (compress) {
94+
exchange.getResponseHeaders().add("Content-Encoding", "gzip");
95+
}
96+
97+
exchange.sendResponseHeaders(200, 0);
98+
99+
// Encode response if needed
100+
OutputStream out = exchange.getResponseBody();
101+
if (compress) {
102+
out = new GZIPOutputStream(out);
103+
}
104+
105+
// Outputs <request-encoding|null>#<response-encoding|null>#<request-body>
106+
out.write(String.valueOf(contentEncoding).getBytes(StandardCharsets.UTF_8));
107+
out.write('#');
108+
out.write((compress ? "gzip" : "null").getBytes(StandardCharsets.UTF_8));
109+
out.write('#');
110+
out.write((compressedRequest ? contentLength : "null").getBytes(StandardCharsets.UTF_8));
111+
out.write('#');
112+
out.write(bytes);
113+
out.close();
114+
115+
exchange.close();
116+
}
117+
}
118+
119+
/** Read all bytes of an input stream and close it. */
120+
private static byte[] readAll(InputStream in) throws IOException {
121+
byte[] buffer = new byte[1024];
122+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
123+
int len = 0;
124+
while ((len = in.read(buffer)) > 0) {
125+
bos.write(buffer, 0, len);
126+
}
127+
in.close();
128+
return bos.toByteArray();
129+
}
130+
131+
private RestClient createClient(boolean enableCompression) {
132+
InetSocketAddress address = httpServer.getAddress();
133+
return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
134+
.setCompressionEnabled(enableCompression)
135+
.build();
136+
}
137+
138+
public void testCompressingClientWithContentLengthSync() throws Exception {
139+
RestClient restClient = createClient(true);
140+
141+
Request request = new Request("POST", "/");
142+
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));
143+
144+
Response response = restClient.performRequest(request);
145+
146+
HttpEntity entity = response.getEntity();
147+
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
148+
// Content-Encoding#Accept-Encoding#Content-Length#Content
149+
Assert.assertEquals("gzip#gzip#38#compressing client", content);
150+
151+
restClient.close();
152+
}
153+
154+
public void testCompressingClientContentLengthAsync() throws Exception {
155+
InetSocketAddress address = httpServer.getAddress();
156+
RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
157+
.setCompressionEnabled(true)
158+
.build();
159+
160+
Request request = new Request("POST", "/");
161+
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));
162+
163+
FutureResponse futureResponse = new FutureResponse();
164+
restClient.performRequestAsync(request, futureResponse);
165+
Response response = futureResponse.get();
166+
167+
// Server should report it had a compressed request and sent back a compressed response
168+
HttpEntity entity = response.getEntity();
169+
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
170+
171+
// Content-Encoding#Accept-Encoding#Content-Length#Content
172+
Assert.assertEquals("gzip#gzip#38#compressing client", content);
173+
174+
restClient.close();
175+
}
176+
177+
public static class FutureResponse extends CompletableFuture<Response> implements ResponseListener {
178+
@Override
179+
public void onSuccess(Response response) {
180+
this.complete(response);
181+
}
182+
183+
@Override
184+
public void onFailure(Exception exception) {
185+
this.completeExceptionally(exception);
186+
}
187+
}
188+
}

0 commit comments

Comments
 (0)