Skip to content

Commit 56cca2a

Browse files
author
Sachin Kale
committed
Restore snapshot changes for V2
Signed-off-by: Sachin Kale <[email protected]>
1 parent 13c277d commit 56cca2a

File tree

12 files changed

+298
-55
lines changed

12 files changed

+298
-55
lines changed

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
447447
// could not be taken due to partial being set to false.
448448
shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped");
449449
} else {
450-
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), indexId, shardId);
450+
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo, indexId, shardId);
451451
}
452452
shardStatus.put(shardId, shardSnapshotStatus);
453453
}

server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java

+43-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import java.io.IOException;
4949
import java.util.Objects;
5050

51+
import static org.opensearch.Version.V_2_17_0;
52+
5153
/**
5254
* Represents the recovery source of a shard. Available recovery types are:
5355
* <p>
@@ -265,8 +267,14 @@ public static class SnapshotRecoverySource extends RecoverySource {
265267
private final boolean remoteStoreIndexShallowCopy;
266268
private final String sourceRemoteStoreRepository;
267269

270+
private final long pinnedTimestamp;
271+
268272
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
269-
this(restoreUUID, snapshot, version, indexId, false, false, null);
273+
this(restoreUUID, snapshot, version, indexId, false, false, null, 0L);
274+
}
275+
276+
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId, long pinnedTimestamp) {
277+
this(restoreUUID, snapshot, version, indexId, false, false, null, pinnedTimestamp);
270278
}
271279

272280
public SnapshotRecoverySource(
@@ -285,6 +293,27 @@ public SnapshotRecoverySource(
285293
this.isSearchableSnapshot = isSearchableSnapshot;
286294
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
287295
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
296+
this.pinnedTimestamp = 0L;
297+
}
298+
299+
public SnapshotRecoverySource(
300+
String restoreUUID,
301+
Snapshot snapshot,
302+
Version version,
303+
IndexId indexId,
304+
boolean isSearchableSnapshot,
305+
boolean remoteStoreIndexShallowCopy,
306+
@Nullable String sourceRemoteStoreRepository,
307+
long pinnedTimestamp
308+
) {
309+
this.restoreUUID = restoreUUID;
310+
this.snapshot = Objects.requireNonNull(snapshot);
311+
this.version = Objects.requireNonNull(version);
312+
this.index = Objects.requireNonNull(indexId);
313+
this.isSearchableSnapshot = isSearchableSnapshot;
314+
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
315+
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
316+
this.pinnedTimestamp = pinnedTimestamp;
288317
}
289318

290319
SnapshotRecoverySource(StreamInput in) throws IOException {
@@ -304,6 +333,11 @@ public SnapshotRecoverySource(
304333
remoteStoreIndexShallowCopy = false;
305334
sourceRemoteStoreRepository = null;
306335
}
336+
if (in.getVersion().onOrAfter(V_2_17_0)) {
337+
pinnedTimestamp = in.readLong();
338+
} else {
339+
pinnedTimestamp = 0L;
340+
}
307341
}
308342

309343
public String restoreUUID() {
@@ -340,6 +374,10 @@ public boolean remoteStoreIndexShallowCopy() {
340374
return remoteStoreIndexShallowCopy;
341375
}
342376

377+
public long getPinnedTimestamp() {
378+
return pinnedTimestamp;
379+
}
380+
343381
@Override
344382
protected void writeAdditionalFields(StreamOutput out) throws IOException {
345383
out.writeString(restoreUUID);
@@ -353,6 +391,10 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
353391
out.writeBoolean(remoteStoreIndexShallowCopy);
354392
out.writeOptionalString(sourceRemoteStoreRepository);
355393
}
394+
if (out.getVersion().onOrAfter(V_2_17_0)) {
395+
out.writeLong(pinnedTimestamp);
396+
}
397+
356398
}
357399

358400
@Override

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+66-10
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
import org.opensearch.index.recovery.RecoveryStats;
151151
import org.opensearch.index.refresh.RefreshStats;
152152
import org.opensearch.index.remote.RemoteSegmentStats;
153+
import org.opensearch.index.remote.RemoteStorePathStrategy;
153154
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
154155
import org.opensearch.index.search.stats.SearchStats;
155156
import org.opensearch.index.search.stats.ShardSearchStats;
@@ -2479,6 +2480,10 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
24792480
* Operations from the translog will be replayed to bring lucene up to date.
24802481
**/
24812482
public void openEngineAndRecoverFromTranslog() throws IOException {
2483+
openEngineAndRecoverFromTranslog(true);
2484+
}
2485+
2486+
public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOException {
24822487
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
24832488
maybeCheckIndex();
24842489
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
@@ -2499,7 +2504,16 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
24992504
loadGlobalCheckpointToReplicationTracker();
25002505
}
25012506

2502-
innerOpenEngineAndTranslog(replicationTracker);
2507+
if (isSnapshotV2Restore()) {
2508+
translogConfig.setPinnedTimestamp(((SnapshotRecoverySource) routingEntry().recoverySource()).getPinnedTimestamp());
2509+
}
2510+
2511+
innerOpenEngineAndTranslog(replicationTracker, syncFromRemote);
2512+
2513+
if (isSnapshotV2Restore()) {
2514+
translogConfig.setPinnedTimestamp(0);
2515+
}
2516+
25032517
getEngine().translogManager()
25042518
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
25052519
}
@@ -2561,7 +2575,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
25612575
if (shardRouting.primary()) {
25622576
if (syncFromRemote) {
25632577
syncRemoteTranslogAndUpdateGlobalCheckpoint();
2564-
} else {
2578+
} else if (isSnapshotV2Restore() == false) {
25652579
// we will enter this block when we do not want to recover from remote translog.
25662580
// currently only during snapshot restore, we are coming into this block.
25672581
// here, as while initiliazing remote translog we cannot skip downloading translog files,
@@ -2607,6 +2621,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
26072621
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
26082622
}
26092623

2624+
private boolean isSnapshotV2Restore() {
2625+
return routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT
2626+
&& ((SnapshotRecoverySource) routingEntry().recoverySource()).getPinnedTimestamp() > 0;
2627+
}
2628+
26102629
private boolean assertSequenceNumbersInCommit() throws IOException {
26112630
final Map<String, String> userData = fetchUserData();
26122631
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
@@ -2892,7 +2911,12 @@ public void restoreFromSnapshotAndRemoteStore(
28922911
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
28932912
+ recoveryState.getRecoverySource();
28942913
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
2895-
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
2914+
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState().getRecoverySource();
2915+
if (recoverySource.getPinnedTimestamp() != 0) {
2916+
storeRecovery.recoverShallowSnapshotV2(this, repository, repositoriesService, listener, threadPool);
2917+
} else {
2918+
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
2919+
}
28962920
} catch (Exception e) {
28972921
listener.onFailure(e);
28982922
}
@@ -5000,16 +5024,33 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
50005024
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
50015025
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
50025026
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
5027+
syncTranslogFilesFromRemoteTranslog(
5028+
repository,
5029+
shardId,
5030+
indexSettings.getRemoteStorePathStrategy(),
5031+
indexSettings().isTranslogMetadataEnabled(),
5032+
0
5033+
);
5034+
}
5035+
5036+
public void syncTranslogFilesFromRemoteTranslog(
5037+
Repository repository,
5038+
ShardId shardId,
5039+
RemoteStorePathStrategy remoteStorePathStrategy,
5040+
boolean isTranslogMetadataEnabled,
5041+
long timestamp
5042+
) throws IOException {
50035043
RemoteFsTranslog.download(
50045044
repository,
50055045
shardId,
50065046
getThreadPool(),
50075047
shardPath().resolveTranslog(),
5008-
indexSettings.getRemoteStorePathStrategy(),
5048+
remoteStorePathStrategy,
50095049
remoteStoreSettings,
50105050
logger,
50115051
shouldSeedRemoteStore(),
5012-
indexSettings().isTranslogMetadataEnabled()
5052+
isTranslogMetadataEnabled,
5053+
timestamp
50135054
);
50145055
}
50155056

@@ -5098,15 +5139,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
50985139
* Downloads segments from given remote segment store for a specific commit.
50995140
* @param overrideLocal flag to override local segment files with those in remote store
51005141
* @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments
5101-
* @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments
5102-
* @param commitGeneration commit generation at the time of commit operation for which we are syncing segments
51035142
* @throws IOException if exception occurs while reading segments from remote store
51045143
*/
51055144
public void syncSegmentsFromGivenRemoteSegmentStore(
51065145
boolean overrideLocal,
51075146
RemoteSegmentStoreDirectory sourceRemoteDirectory,
5108-
long primaryTerm,
5109-
long commitGeneration
5147+
RemoteSegmentMetadata remoteSegmentMetadata,
5148+
boolean pinnedTimestamp
51105149
) throws IOException {
51115150
logger.trace("Downloading segments from given remote segment store");
51125151
RemoteSegmentStoreDirectory remoteDirectory = null;
@@ -5129,12 +5168,29 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
51295168
overrideLocal,
51305169
() -> {}
51315170
);
5132-
if (segmentsNFile != null) {
5171+
if (pinnedTimestamp) {
5172+
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
5173+
remoteSegmentMetadata.getSegmentInfosBytes(),
5174+
remoteSegmentMetadata.getGeneration()
5175+
);
5176+
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
5177+
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
5178+
// Extra segments will be wiped on engine open.
5179+
for (String file : List.of(store.directory().listAll())) {
5180+
if (file.startsWith(IndexFileNames.SEGMENTS)) {
5181+
store.deleteQuiet(file);
5182+
}
5183+
}
5184+
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5185+
: "There should not be any segments file in the dir";
5186+
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
5187+
} else if (segmentsNFile != null) {
51335188
try (
51345189
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
51355190
storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT)
51365191
)
51375192
) {
5193+
long commitGeneration = SegmentInfos.generationFromSegmentsFileName(segmentsNFile);
51385194
SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration);
51395195
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
51405196
if (remoteStore != null) {

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

+79-3
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,15 @@
5858
import org.opensearch.index.engine.Engine;
5959
import org.opensearch.index.engine.EngineException;
6060
import org.opensearch.index.mapper.MapperService;
61+
import org.opensearch.index.remote.RemoteStorePathStrategy;
62+
import org.opensearch.index.remote.RemoteStoreUtils;
6163
import org.opensearch.index.seqno.SequenceNumbers;
6264
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
6365
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
6466
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
6567
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
6668
import org.opensearch.index.store.Store;
69+
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
6770
import org.opensearch.index.translog.Checkpoint;
6871
import org.opensearch.index.translog.Translog;
6972
import org.opensearch.index.translog.TranslogHeader;
@@ -72,6 +75,7 @@
7275
import org.opensearch.repositories.IndexId;
7376
import org.opensearch.repositories.RepositoriesService;
7477
import org.opensearch.repositories.Repository;
78+
import org.opensearch.repositories.RepositoryData;
7579
import org.opensearch.threadpool.ThreadPool;
7680

7781
import java.io.IOException;
@@ -405,14 +409,14 @@ void recoverFromSnapshotAndRemoteStore(
405409
shardId,
406410
shallowCopyShardMetadata.getRemoteStorePathStrategy()
407411
);
408-
sourceRemoteDirectory.initializeToSpecificCommit(
412+
RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificCommit(
409413
primaryTerm,
410414
commitGeneration,
411415
recoverySource.snapshot().getSnapshotId().getUUID()
412416
);
413-
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
417+
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, remoteSegmentMetadata, false);
414418
final Store store = indexShard.store();
415-
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
419+
if (indexShard.indexSettings.isRemoteStoreEnabled() == false) {
416420
bootstrap(indexShard, store);
417421
} else {
418422
bootstrapForSnapshot(indexShard, store);
@@ -441,6 +445,78 @@ void recoverFromSnapshotAndRemoteStore(
441445
}
442446
}
443447

448+
void recoverShallowSnapshotV2(
449+
final IndexShard indexShard,
450+
Repository repository,
451+
RepositoriesService repositoriesService,
452+
ActionListener<Boolean> listener,
453+
ThreadPool threadPool
454+
) {
455+
try {
456+
if (canRecover(indexShard)) {
457+
indexShard.preRecovery();
458+
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
459+
assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType;
460+
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource();
461+
indexShard.prepareForIndexRecovery();
462+
463+
assert recoverySource.getPinnedTimestamp() != 0;
464+
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
465+
repository.getRepositoryData(repositoryDataListener);
466+
repositoryDataListener.whenComplete(repositoryData -> {
467+
IndexId indexId = repositoryData.resolveIndexId(recoverySource.index().getName());
468+
IndexMetadata prevIndexMetadata = repository.getSnapshotIndexMetaData(
469+
repositoryData,
470+
recoverySource.snapshot().getSnapshotId(),
471+
indexId
472+
);
473+
RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(
474+
() -> repositoriesService,
475+
threadPool
476+
);
477+
String remoteStoreRepository = ((SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource())
478+
.sourceRemoteStoreRepository();
479+
if (remoteStoreRepository == null) {
480+
remoteStoreRepository = IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get(prevIndexMetadata.getSettings());
481+
}
482+
RemoteStorePathStrategy remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(prevIndexMetadata);
483+
RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
484+
remoteStoreRepository,
485+
prevIndexMetadata.getIndexUUID(),
486+
shardId,
487+
remoteStorePathStrategy
488+
);
489+
RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificTimestamp(
490+
recoverySource.getPinnedTimestamp()
491+
);
492+
493+
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, remoteSegmentMetadata, true);
494+
indexShard.syncTranslogFilesFromRemoteTranslog(
495+
repositoriesService.repository(remoteStoreRepository),
496+
new ShardId(prevIndexMetadata.getIndex(), shardId.id()),
497+
remoteStorePathStrategy,
498+
RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata),
499+
recoverySource.getPinnedTimestamp()
500+
);
501+
502+
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
503+
writeEmptyRetentionLeasesFile(indexShard);
504+
indexShard.recoveryState().getIndex().setFileDetailsComplete();
505+
indexShard.openEngineAndRecoverFromTranslog(false);
506+
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
507+
indexShard.finalizeRecovery();
508+
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
509+
indexShard.waitForRemoteStoreSync();
510+
}
511+
indexShard.postRecovery("post recovery from remote_store");
512+
listener.onResponse(true);
513+
}, listener::onFailure);
514+
}
515+
} catch (Exception e) {
516+
listener.onFailure(e);
517+
}
518+
}
519+
444520
private boolean canRecover(IndexShard indexShard) {
445521
if (indexShard.state() == IndexShardState.CLOSED) {
446522
// got closed on us, just ignore this recovery

0 commit comments

Comments
 (0)