Skip to content

Commit 16eb7d2

Browse files
committed
add integ tests to cover segment upload timeouts
Signed-off-by: Varun Bansal <[email protected]>
1 parent d15ded5 commit 16eb7d2

File tree

6 files changed

+146
-39
lines changed

6 files changed

+146
-39
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
2727
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
2828
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
29-
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
3029

3130
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3231
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequest;
12+
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.unit.TimeValue;
15+
import org.opensearch.indices.RemoteStoreSettings;
16+
import org.opensearch.plugins.Plugin;
17+
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
18+
import org.opensearch.test.OpenSearchIntegTestCase;
19+
20+
import java.nio.file.Path;
21+
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.Locale;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
27+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
28+
29+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
30+
public class RemoteStoreRefreshListenerMultipartIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
31+
32+
protected Collection<Class<? extends Plugin>> nodePlugins() {
33+
return Arrays.asList(MockFsRepositoryPlugin.class);
34+
}
35+
36+
@Override
37+
public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
38+
Settings settings = super.buildRemoteStoreNodeAttributes(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure);
39+
String segmentRepoTypeAttributeKey = String.format(
40+
Locale.getDefault(),
41+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
42+
REPOSITORY_NAME
43+
);
44+
String translogRepoTypeAttributeKey = String.format(
45+
Locale.getDefault(),
46+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
47+
TRANSLOG_REPOSITORY_NAME
48+
);
49+
50+
String stateRepoTypeAttributeKey = String.format(
51+
Locale.getDefault(),
52+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
53+
REPOSITORY_NAME
54+
);
55+
56+
return Settings.builder()
57+
.put(settings)
58+
.put(segmentRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE)
59+
.put(translogRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE)
60+
.put(stateRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE)
61+
.build();
62+
}
63+
64+
public void testRemoteRefreshSegmentUploadTimeout() throws Exception {
65+
Path location = randomRepoPath().toAbsolutePath();
66+
setup(location, randomDoubleBetween(0.1, 0.15, true), "metadata", 10L);
67+
68+
client().admin()
69+
.cluster()
70+
.prepareUpdateSettings()
71+
.setPersistentSettings(Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false))
72+
.setPersistentSettings(
73+
Settings.builder()
74+
.put(RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(1))
75+
)
76+
.get();
77+
78+
// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
79+
// due to IOExceptions that are thrown while doing uploadBlobs.
80+
indexData(randomIntBetween(5, 10), randomBoolean());
81+
logger.info("--> Indexed data");
82+
logger.info("--> Verify that the segment upload fails");
83+
try {
84+
assertBusy(() -> {
85+
RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin()
86+
.cluster()
87+
.remoteStoreStats(new RemoteStoreStatsRequest())
88+
.get();
89+
Arrays.asList(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(remoteStoreStats -> {
90+
assertTrue(remoteStoreStats.getSegmentStats().totalUploadsFailed > 10);
91+
});
92+
}, 10, TimeUnit.SECONDS);
93+
} catch (Exception e) {
94+
throw new RuntimeException(e);
95+
}
96+
cleanupRepo();
97+
}
98+
}

server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java

+40-37
Original file line numberDiff line numberDiff line change
@@ -76,46 +76,49 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
7676
});
7777
thread.start();
7878
}
79-
try {
80-
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
81-
throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName());
82-
}
83-
} catch (InterruptedException e) {
84-
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName());
85-
}
86-
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
87-
outputStream.write(buffer);
88-
}
89-
if (writeContext.getFileSize() != totalContentRead.get()) {
90-
throw new IOException(
91-
"Incorrect content length read for file "
92-
+ writeContext.getFileName()
93-
+ ", actual file size: "
94-
+ writeContext.getFileSize()
95-
+ ", bytes read: "
96-
+ totalContentRead.get()
97-
);
98-
}
9979

100-
try {
101-
// bulks need to succeed for segment files to be generated
102-
if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) {
103-
completionListener.onFailure(
104-
new RuntimeException(
105-
new CorruptIndexException(
106-
"Data integrity check failure for file: " + writeContext.getFileName(),
107-
writeContext.getFileName()
80+
Thread thread = new Thread(() -> {
81+
try {
82+
try {
83+
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
84+
throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName());
85+
}
86+
} catch (InterruptedException e) {
87+
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName());
88+
}
89+
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
90+
outputStream.write(buffer);
91+
}
92+
if (writeContext.getFileSize() != totalContentRead.get()) {
93+
throw new IOException(
94+
"Incorrect content length read for file "
95+
+ writeContext.getFileName()
96+
+ ", actual file size: "
97+
+ writeContext.getFileSize()
98+
+ ", bytes read: "
99+
+ totalContentRead.get()
100+
);
101+
}
102+
103+
// bulks need to succeed for segment files to be generated
104+
if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) {
105+
completionListener.onFailure(
106+
new RuntimeException(
107+
new CorruptIndexException(
108+
"Data integrity check failure for file: " + writeContext.getFileName(),
109+
writeContext.getFileName()
110+
)
108111
)
109-
)
110-
);
111-
} else {
112-
writeContext.getUploadFinalizer().accept(true);
113-
completionListener.onResponse(null);
112+
);
113+
} else {
114+
writeContext.getUploadFinalizer().accept(true);
115+
completionListener.onResponse(null);
116+
}
117+
} catch (Exception e) {
118+
completionListener.onFailure(e);
114119
}
115-
} catch (Exception e) {
116-
completionListener.onFailure(e);
117-
}
118-
120+
});
121+
thread.start();
119122
}
120123

121124
@Override

server/src/main/java/org/opensearch/client/ClusterAdminClient.java

+2
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ public interface ClusterAdminClient extends OpenSearchClient {
322322

323323
void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener<RemoteStoreStatsResponse> listener);
324324

325+
ActionFuture<RemoteStoreStatsResponse> remoteStoreStats(RemoteStoreStatsRequest request);
326+
325327
RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId);
326328

327329
/**

server/src/main/java/org/opensearch/client/support/AbstractClient.java

+5
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,11 @@ public void remoteStoreStats(final RemoteStoreStatsRequest request, final Action
922922
execute(RemoteStoreStatsAction.INSTANCE, request, listener);
923923
}
924924

925+
@Override
926+
public ActionFuture<RemoteStoreStatsResponse> remoteStoreStats(final RemoteStoreStatsRequest request) {
927+
return execute(RemoteStoreStatsAction.INSTANCE, request);
928+
}
929+
925930
@Override
926931
public RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId) {
927932
RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = new RemoteStoreStatsRequestBuilder(

server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public class RemoteStoreSettings {
110110
*/
111111
public static final Setting<TimeValue> CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
112112
"cluster.remote_store.segment.transfer_timeout",
113-
TimeValue.timeValueHours(3),
113+
TimeValue.timeValueHours(1),
114114
TimeValue.timeValueMinutes(10),
115115
Property.NodeScope,
116116
Property.Dynamic

0 commit comments

Comments
 (0)