Skip to content

Commit b3049fb

Browse files
authored
[Remote Store] Add segment transfer timeout dynamic setting (opensearch-project#13679)
* [Remote Store] Add segment transfer timeout dynamic setting Signed-off-by: Varun Bansal <[email protected]>
1 parent 5441d55 commit b3049fb

File tree

8 files changed

+154
-12
lines changed

8 files changed

+154
-12
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
1010
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
1111
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
12+
- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))
1213

1314
### Dependencies
1415
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
2727
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
2828
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
29-
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
3029

3130
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3231
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ public void apply(Settings value, Settings current, Settings previous) {
737737
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
738738
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
739739
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
740+
RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,
740741
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
741742
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
742743
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3970,7 +3970,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
39703970
new RemoteStoreRefreshListener(
39713971
this,
39723972
this.checkpointPublisher,
3973-
remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId())
3973+
remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()),
3974+
remoteStoreSettings
39743975
)
39753976
);
39763977
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
3434
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
3535
import org.opensearch.index.translog.Translog;
36+
import org.opensearch.indices.RemoteStoreSettings;
3637
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
3738
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
3839
import org.opensearch.threadpool.ThreadPool;
@@ -45,6 +46,7 @@
4546
import java.util.Map;
4647
import java.util.Set;
4748
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.TimeUnit;
4850
import java.util.concurrent.atomic.AtomicBoolean;
4951
import java.util.stream.Collectors;
5052

@@ -89,11 +91,13 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
8991
private volatile long primaryTerm;
9092
private volatile Iterator<TimeValue> backoffDelayIterator;
9193
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
94+
private final RemoteStoreSettings remoteStoreSettings;
9295

9396
public RemoteStoreRefreshListener(
9497
IndexShard indexShard,
9598
SegmentReplicationCheckpointPublisher checkpointPublisher,
96-
RemoteSegmentTransferTracker segmentTracker
99+
RemoteSegmentTransferTracker segmentTracker,
100+
RemoteStoreSettings remoteStoreSettings
97101
) {
98102
super(indexShard.getThreadPool());
99103
logger = Loggers.getLogger(getClass(), indexShard.shardId());
@@ -116,6 +120,7 @@ public RemoteStoreRefreshListener(
116120
this.segmentTracker = segmentTracker;
117121
resetBackOffDelayIterator();
118122
this.checkpointPublisher = checkpointPublisher;
123+
this.remoteStoreSettings = remoteStoreSettings;
119124
}
120125

121126
@Override
@@ -286,7 +291,12 @@ public void onFailure(Exception e) {
286291

287292
// Start the segments files upload
288293
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
289-
latch.await();
294+
if (latch.await(
295+
remoteStoreSettings.getClusterRemoteSegmentTransferTimeout().millis(),
296+
TimeUnit.MILLISECONDS
297+
) == false) {
298+
throw new SegmentUploadFailedException("Timeout while waiting for remote segment transfer to complete");
299+
}
290300
} catch (EngineException e) {
291301
logger.warn("Exception while reading SegmentInfosSnapshot", e);
292302
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import java.io.IOException;
12+
13+
/**
14+
* Exception to be thrown when a segment upload fails.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class SegmentUploadFailedException extends IOException {
19+
20+
/**
21+
* Creates a new SegmentUploadFailedException.
22+
*
23+
* @param message error message
24+
*/
25+
public SegmentUploadFailedException(String message) {
26+
super(message);
27+
}
28+
}

server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java

+26
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,21 @@ public class RemoteStoreSettings {
105105
Property.NodeScope
106106
);
107107

108+
/**
109+
* Controls timeout value while uploading segment files to remote segment store
110+
*/
111+
public static final Setting<TimeValue> CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
112+
"cluster.remote_store.segment.transfer_timeout",
113+
TimeValue.timeValueMinutes(30),
114+
TimeValue.timeValueMinutes(10),
115+
Property.NodeScope,
116+
Property.Dynamic
117+
);
118+
108119
private volatile TimeValue clusterRemoteTranslogBufferInterval;
109120
private volatile int minRemoteSegmentMetadataFiles;
110121
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
122+
private volatile TimeValue clusterRemoteSegmentTransferTimeout;
111123
private volatile RemoteStoreEnums.PathType pathType;
112124
private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm;
113125
private volatile int maxRemoteTranslogReaders;
@@ -139,6 +151,12 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
139151

140152
maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings);
141153
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders);
154+
155+
clusterRemoteSegmentTransferTimeout = CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.get(settings);
156+
clusterSettings.addSettingsUpdateConsumer(
157+
CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,
158+
this::setClusterRemoteSegmentTransferTimeout
159+
);
142160
}
143161

144162
public TimeValue getClusterRemoteTranslogBufferInterval() {
@@ -161,10 +179,18 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() {
161179
return clusterRemoteTranslogTransferTimeout;
162180
}
163181

182+
public TimeValue getClusterRemoteSegmentTransferTimeout() {
183+
return clusterRemoteSegmentTransferTimeout;
184+
}
185+
164186
private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
165187
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
166188
}
167189

190+
private void setClusterRemoteSegmentTransferTimeout(TimeValue clusterRemoteSegmentTransferTimeout) {
191+
this.clusterRemoteSegmentTransferTimeout = clusterRemoteSegmentTransferTimeout;
192+
}
193+
168194
@ExperimentalApi
169195
public RemoteStoreEnums.PathType getPathType() {
170196
return pathType;

server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java

+84-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.common.lease.Releasable;
2424
import org.opensearch.common.settings.ClusterSettings;
2525
import org.opensearch.common.settings.Settings;
26+
import org.opensearch.common.unit.TimeValue;
2627
import org.opensearch.core.action.ActionListener;
2728
import org.opensearch.core.index.shard.ShardId;
2829
import org.opensearch.index.engine.InternalEngineFactory;
@@ -34,6 +35,7 @@
3435
import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils;
3536
import org.opensearch.index.store.Store;
3637
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
38+
import org.opensearch.indices.DefaultRemoteStoreSettings;
3739
import org.opensearch.indices.RemoteStoreSettings;
3840
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
3941
import org.opensearch.indices.replication.common.ReplicationType;
@@ -91,7 +93,12 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
9193
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY);
9294
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
9395
RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
94-
remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker);
96+
remoteStoreRefreshListener = new RemoteStoreRefreshListener(
97+
indexShard,
98+
SegmentReplicationCheckpointPublisher.EMPTY,
99+
tracker,
100+
DefaultRemoteStoreSettings.INSTANCE
101+
);
95102
}
96103

97104
private void indexDocs(int startDocId, int numberOfDocs) throws IOException {
@@ -176,7 +183,12 @@ public void testRemoteDirectoryInitThrowsException() throws IOException {
176183
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);
177184

178185
// Since the thrown IOException is caught in the constructor, ctor should be invoked successfully.
179-
new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteSegmentTransferTracker.class));
186+
new RemoteStoreRefreshListener(
187+
shard,
188+
SegmentReplicationCheckpointPublisher.EMPTY,
189+
mock(RemoteSegmentTransferTracker.class),
190+
DefaultRemoteStoreSettings.INSTANCE
191+
);
180192

181193
// Validate that the stream of metadata file of remoteMetadataDirectory has been opened only once and the
182194
// listFilesByPrefixInLexicographicOrder has been called twice.
@@ -371,6 +383,33 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
371383
assertNoLagAndTotalUploadsFailed(segmentTracker, 1);
372384
}
373385

386+
public void testSegmentUploadTimeout() throws Exception {
387+
// This covers the case where segment upload fails due to timeout
388+
int succeedOnAttempt = 1;
389+
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
390+
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
391+
CountDownLatch successLatch = new CountDownLatch(2);
392+
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
393+
succeedOnAttempt,
394+
refreshCountLatch,
395+
successLatch,
396+
1,
397+
new CountDownLatch(0),
398+
true,
399+
true
400+
);
401+
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
402+
assertBusy(() -> assertEquals(1, successLatch.getCount()));
403+
RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2();
404+
RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
405+
assertBusy(() -> {
406+
assertTrue(segmentTracker.getTotalUploadsFailed() > 1);
407+
assertTrue(segmentTracker.getTotalUploadsSucceeded() < 2);
408+
});
409+
// shutdown threadpool for avoid leaking threads
410+
indexShard.getThreadPool().shutdownNow();
411+
}
412+
374413
/**
375414
* Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt.
376415
* Snapshot and metadata files created in failed attempt should not break retry.
@@ -470,6 +509,7 @@ public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception {
470509
successLatch,
471510
checkpointPublishSucceedOnAttempt,
472511
reachedCheckpointPublishLatch,
512+
false,
473513
false
474514
);
475515

@@ -521,7 +561,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
521561
successLatch,
522562
succeedCheckpointPublishOnAttempt,
523563
reachedCheckpointPublishLatch,
524-
true
564+
true,
565+
false
525566
);
526567
}
527568

@@ -531,7 +572,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
531572
CountDownLatch successLatch,
532573
int succeedCheckpointPublishOnAttempt,
533574
CountDownLatch reachedCheckpointPublishLatch,
534-
boolean mockPrimaryTerm
575+
boolean mockPrimaryTerm,
576+
boolean testUploadTimeout
535577
) throws IOException {
536578
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
537579
indexShard = newStartedShard(
@@ -565,9 +607,22 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
565607
// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
566608
Store remoteStore = mock(Store.class);
567609
when(shard.remoteStore()).thenReturn(remoteStore);
568-
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
569-
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate())
570-
.getDelegate();
610+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory;
611+
RemoteDirectory remoteDirectory = mock(RemoteDirectory.class);
612+
613+
if (testUploadTimeout) {
614+
remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(
615+
remoteDirectory,
616+
mock(RemoteDirectory.class),
617+
mock(RemoteStoreLockManager.class),
618+
indexShard.getThreadPool(),
619+
indexShard.shardId
620+
);
621+
} else {
622+
remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore()
623+
.directory()).getDelegate()).getDelegate();
624+
}
625+
571626
FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory));
572627
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);
573628

@@ -639,7 +694,28 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
639694
RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class);
640695
when(remoteStoreSettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10);
641696
when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings);
642-
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker);
697+
if (testUploadTimeout) {
698+
when(remoteStoreSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMillis(10));
699+
doAnswer(invocation -> {
700+
ActionListener<Void> actionListener = invocation.getArgument(5);
701+
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
702+
try {
703+
Thread.sleep(30000);
704+
} catch (InterruptedException e) {
705+
logger.warn("copyFrom thread interrupted during sleep");
706+
}
707+
actionListener.onResponse(null);
708+
});
709+
return true;
710+
}).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class));
711+
}
712+
713+
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(
714+
shard,
715+
emptyCheckpointPublisher,
716+
tracker,
717+
remoteStoreSettings
718+
);
643719
refreshListener.afterRefresh(true);
644720
return Tuple.tuple(refreshListener, remoteStoreStatsTrackerFactory);
645721
}

0 commit comments

Comments
 (0)