Skip to content

Commit 0c4cb0f

Browse files
committed
[Remote Store] Add segment transfer timeout dynamic setting (opensearch-project#13679)
* [Remote Store] Add segment transfer timeout dynamic setting Signed-off-by: Varun Bansal <[email protected]> (cherry picked from commit b3049fb)
1 parent 03c13cb commit 0c4cb0f

File tree

8 files changed

+153
-12
lines changed

8 files changed

+153
-12
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
99
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
1010
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
11+
- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))
1112

1213
### Dependencies
1314
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
2727
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
2828
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
29-
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
3029

3130
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3231
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,7 @@ public void apply(Settings value, Settings current, Settings previous) {
739739
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
740740
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
741741
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
742+
RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,
742743
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
743744
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
744745
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3985,7 +3985,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
39853985
new RemoteStoreRefreshListener(
39863986
this,
39873987
this.checkpointPublisher,
3988-
remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId())
3988+
remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()),
3989+
remoteStoreSettings
39893990
)
39903991
);
39913992
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
3434
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
3535
import org.opensearch.index.translog.Translog;
36+
import org.opensearch.indices.RemoteStoreSettings;
3637
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
3738
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
3839
import org.opensearch.threadpool.ThreadPool;
@@ -45,6 +46,7 @@
4546
import java.util.Map;
4647
import java.util.Set;
4748
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.TimeUnit;
4850
import java.util.concurrent.atomic.AtomicBoolean;
4951
import java.util.stream.Collectors;
5052

@@ -89,11 +91,13 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
8991
private volatile long primaryTerm;
9092
private volatile Iterator<TimeValue> backoffDelayIterator;
9193
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
94+
private final RemoteStoreSettings remoteStoreSettings;
9295

9396
public RemoteStoreRefreshListener(
9497
IndexShard indexShard,
9598
SegmentReplicationCheckpointPublisher checkpointPublisher,
96-
RemoteSegmentTransferTracker segmentTracker
99+
RemoteSegmentTransferTracker segmentTracker,
100+
RemoteStoreSettings remoteStoreSettings
97101
) {
98102
super(indexShard.getThreadPool());
99103
logger = Loggers.getLogger(getClass(), indexShard.shardId());
@@ -116,6 +120,7 @@ public RemoteStoreRefreshListener(
116120
this.segmentTracker = segmentTracker;
117121
resetBackOffDelayIterator();
118122
this.checkpointPublisher = checkpointPublisher;
123+
this.remoteStoreSettings = remoteStoreSettings;
119124
}
120125

121126
@Override
@@ -286,7 +291,12 @@ public void onFailure(Exception e) {
286291

287292
// Start the segments files upload
288293
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
289-
latch.await();
294+
if (latch.await(
295+
remoteStoreSettings.getClusterRemoteSegmentTransferTimeout().millis(),
296+
TimeUnit.MILLISECONDS
297+
) == false) {
298+
throw new SegmentUploadFailedException("Timeout while waiting for remote segment transfer to complete");
299+
}
290300
} catch (EngineException e) {
291301
logger.warn("Exception while reading SegmentInfosSnapshot", e);
292302
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import java.io.IOException;
12+
13+
/**
14+
* Exception to be thrown when a segment upload fails.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class SegmentUploadFailedException extends IOException {
19+
20+
/**
21+
* Creates a new SegmentUploadFailedException.
22+
*
23+
* @param message error message
24+
*/
25+
public SegmentUploadFailedException(String message) {
26+
super(message);
27+
}
28+
}

server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java

+25
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,21 @@ public class RemoteStoreSettings {
105105
Property.NodeScope
106106
);
107107

108+
/**
109+
* Controls timeout value while uploading segment files to remote segment store
110+
*/
111+
public static final Setting<TimeValue> CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
112+
"cluster.remote_store.segment.transfer_timeout",
113+
TimeValue.timeValueMinutes(30),
114+
TimeValue.timeValueMinutes(10),
115+
Property.NodeScope,
116+
Property.Dynamic
117+
);
118+
108119
private volatile TimeValue clusterRemoteTranslogBufferInterval;
109120
private volatile int minRemoteSegmentMetadataFiles;
110121
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
122+
private volatile TimeValue clusterRemoteSegmentTransferTimeout;
111123
private volatile RemoteStoreEnums.PathType pathType;
112124
private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm;
113125
private volatile int maxRemoteTranslogReaders;
@@ -140,6 +152,11 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
140152
maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings);
141153
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders);
142154

155+
clusterRemoteSegmentTransferTimeout = CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.get(settings);
156+
clusterSettings.addSettingsUpdateConsumer(
157+
CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,
158+
this::setClusterRemoteSegmentTransferTimeout
159+
);
143160
}
144161

145162
public TimeValue getClusterRemoteTranslogBufferInterval() {
@@ -162,10 +179,18 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() {
162179
return clusterRemoteTranslogTransferTimeout;
163180
}
164181

182+
public TimeValue getClusterRemoteSegmentTransferTimeout() {
183+
return clusterRemoteSegmentTransferTimeout;
184+
}
185+
165186
private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
166187
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
167188
}
168189

190+
private void setClusterRemoteSegmentTransferTimeout(TimeValue clusterRemoteSegmentTransferTimeout) {
191+
this.clusterRemoteSegmentTransferTimeout = clusterRemoteSegmentTransferTimeout;
192+
}
193+
169194
@ExperimentalApi
170195
public RemoteStoreEnums.PathType getPathType() {
171196
return pathType;

server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java

+84-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.common.lease.Releasable;
2424
import org.opensearch.common.settings.ClusterSettings;
2525
import org.opensearch.common.settings.Settings;
26+
import org.opensearch.common.unit.TimeValue;
2627
import org.opensearch.core.action.ActionListener;
2728
import org.opensearch.core.index.shard.ShardId;
2829
import org.opensearch.index.engine.InternalEngineFactory;
@@ -34,6 +35,7 @@
3435
import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils;
3536
import org.opensearch.index.store.Store;
3637
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
38+
import org.opensearch.indices.DefaultRemoteStoreSettings;
3739
import org.opensearch.indices.RemoteStoreSettings;
3840
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
3941
import org.opensearch.indices.replication.common.ReplicationType;
@@ -90,7 +92,12 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
9092
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY);
9193
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
9294
RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
93-
remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker);
95+
remoteStoreRefreshListener = new RemoteStoreRefreshListener(
96+
indexShard,
97+
SegmentReplicationCheckpointPublisher.EMPTY,
98+
tracker,
99+
DefaultRemoteStoreSettings.INSTANCE
100+
);
94101
}
95102

96103
private void indexDocs(int startDocId, int numberOfDocs) throws IOException {
@@ -175,7 +182,12 @@ public void testRemoteDirectoryInitThrowsException() throws IOException {
175182
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);
176183

177184
// Since the thrown IOException is caught in the constructor, ctor should be invoked successfully.
178-
new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteSegmentTransferTracker.class));
185+
new RemoteStoreRefreshListener(
186+
shard,
187+
SegmentReplicationCheckpointPublisher.EMPTY,
188+
mock(RemoteSegmentTransferTracker.class),
189+
DefaultRemoteStoreSettings.INSTANCE
190+
);
179191

180192
// Validate that the stream of metadata file of remoteMetadataDirectory has been opened only once and the
181193
// listFilesByPrefixInLexicographicOrder has been called twice.
@@ -370,6 +382,33 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
370382
assertNoLagAndTotalUploadsFailed(segmentTracker, 1);
371383
}
372384

385+
public void testSegmentUploadTimeout() throws Exception {
386+
// This covers the case where segment upload fails due to timeout
387+
int succeedOnAttempt = 1;
388+
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
389+
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
390+
CountDownLatch successLatch = new CountDownLatch(2);
391+
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
392+
succeedOnAttempt,
393+
refreshCountLatch,
394+
successLatch,
395+
1,
396+
new CountDownLatch(0),
397+
true,
398+
true
399+
);
400+
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
401+
assertBusy(() -> assertEquals(1, successLatch.getCount()));
402+
RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2();
403+
RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
404+
assertBusy(() -> {
405+
assertTrue(segmentTracker.getTotalUploadsFailed() > 1);
406+
assertTrue(segmentTracker.getTotalUploadsSucceeded() < 2);
407+
});
408+
// shutdown threadpool for avoid leaking threads
409+
indexShard.getThreadPool().shutdownNow();
410+
}
411+
373412
/**
374413
* Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt.
375414
* Snapshot and metadata files created in failed attempt should not break retry.
@@ -469,6 +508,7 @@ public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception {
469508
successLatch,
470509
checkpointPublishSucceedOnAttempt,
471510
reachedCheckpointPublishLatch,
511+
false,
472512
false
473513
);
474514

@@ -520,7 +560,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
520560
successLatch,
521561
succeedCheckpointPublishOnAttempt,
522562
reachedCheckpointPublishLatch,
523-
true
563+
true,
564+
false
524565
);
525566
}
526567

@@ -530,7 +571,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
530571
CountDownLatch successLatch,
531572
int succeedCheckpointPublishOnAttempt,
532573
CountDownLatch reachedCheckpointPublishLatch,
533-
boolean mockPrimaryTerm
574+
boolean mockPrimaryTerm,
575+
boolean testUploadTimeout
534576
) throws IOException {
535577
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
536578
indexShard = newStartedShard(
@@ -564,9 +606,22 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
564606
// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
565607
Store remoteStore = mock(Store.class);
566608
when(shard.remoteStore()).thenReturn(remoteStore);
567-
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
568-
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate())
569-
.getDelegate();
609+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory;
610+
RemoteDirectory remoteDirectory = mock(RemoteDirectory.class);
611+
612+
if (testUploadTimeout) {
613+
remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(
614+
remoteDirectory,
615+
mock(RemoteDirectory.class),
616+
mock(RemoteStoreLockManager.class),
617+
indexShard.getThreadPool(),
618+
indexShard.shardId
619+
);
620+
} else {
621+
remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore()
622+
.directory()).getDelegate()).getDelegate();
623+
}
624+
570625
FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory));
571626
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);
572627

@@ -638,7 +693,28 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
638693
RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class);
639694
when(remoteStoreSettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10);
640695
when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings);
641-
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker);
696+
if (testUploadTimeout) {
697+
when(remoteStoreSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMillis(10));
698+
doAnswer(invocation -> {
699+
ActionListener<Void> actionListener = invocation.getArgument(5);
700+
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
701+
try {
702+
Thread.sleep(30000);
703+
} catch (InterruptedException e) {
704+
logger.warn("copyFrom thread interrupted during sleep");
705+
}
706+
actionListener.onResponse(null);
707+
});
708+
return true;
709+
}).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class));
710+
}
711+
712+
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(
713+
shard,
714+
emptyCheckpointPublisher,
715+
tracker,
716+
remoteStoreSettings
717+
);
642718
refreshListener.afterRefresh(true);
643719
return Tuple.tuple(refreshListener, remoteStoreStatsTrackerFactory);
644720
}

0 commit comments

Comments
 (0)