150
150
import org .opensearch .index .recovery .RecoveryStats ;
151
151
import org .opensearch .index .refresh .RefreshStats ;
152
152
import org .opensearch .index .remote .RemoteSegmentStats ;
153
+ import org .opensearch .index .remote .RemoteStorePathStrategy ;
153
154
import org .opensearch .index .remote .RemoteStoreStatsTrackerFactory ;
154
155
import org .opensearch .index .search .stats .SearchStats ;
155
156
import org .opensearch .index .search .stats .ShardSearchStats ;
@@ -2479,6 +2480,10 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
2479
2480
* Operations from the translog will be replayed to bring lucene up to date.
2480
2481
**/
2481
2482
public void openEngineAndRecoverFromTranslog () throws IOException {
2483
+ openEngineAndRecoverFromTranslog (true );
2484
+ }
2485
+
2486
+ public void openEngineAndRecoverFromTranslog (boolean syncFromRemote ) throws IOException {
2482
2487
recoveryState .validateCurrentStage (RecoveryState .Stage .INDEX );
2483
2488
maybeCheckIndex ();
2484
2489
recoveryState .setStage (RecoveryState .Stage .TRANSLOG );
@@ -2499,7 +2504,16 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
2499
2504
loadGlobalCheckpointToReplicationTracker ();
2500
2505
}
2501
2506
2502
- innerOpenEngineAndTranslog (replicationTracker );
2507
+ if (isSnapshotV2Restore ()) {
2508
+ translogConfig .setDownloadRemoteTranslogOnInit (false );
2509
+ }
2510
+
2511
+ innerOpenEngineAndTranslog (replicationTracker , syncFromRemote );
2512
+
2513
+ if (isSnapshotV2Restore ()) {
2514
+ translogConfig .setDownloadRemoteTranslogOnInit (true );
2515
+ }
2516
+
2503
2517
getEngine ().translogManager ()
2504
2518
.recoverFromTranslog (translogRecoveryRunner , getEngine ().getProcessedLocalCheckpoint (), Long .MAX_VALUE );
2505
2519
}
@@ -2561,7 +2575,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
2561
2575
if (shardRouting .primary ()) {
2562
2576
if (syncFromRemote ) {
2563
2577
syncRemoteTranslogAndUpdateGlobalCheckpoint ();
2564
- } else {
2578
+ } else if ( isSnapshotV2Restore () == false ) {
2565
2579
// we will enter this block when we do not want to recover from remote translog.
2566
2580
// currently only during snapshot restore, we are coming into this block.
2567
2581
// here, as while initiliazing remote translog we cannot skip downloading translog files,
@@ -2607,6 +2621,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
2607
2621
recoveryState .validateCurrentStage (RecoveryState .Stage .TRANSLOG );
2608
2622
}
2609
2623
2624
+ private boolean isSnapshotV2Restore () {
2625
+ return routingEntry ().recoverySource ().getType () == RecoverySource .Type .SNAPSHOT
2626
+ && ((SnapshotRecoverySource ) routingEntry ().recoverySource ()).getPinnedTimestamp () > 0 ;
2627
+ }
2628
+
2610
2629
private boolean assertSequenceNumbersInCommit () throws IOException {
2611
2630
final Map <String , String > userData = fetchUserData ();
2612
2631
assert userData .containsKey (SequenceNumbers .LOCAL_CHECKPOINT_KEY ) : "commit point doesn't contains a local checkpoint" ;
@@ -2892,7 +2911,12 @@ public void restoreFromSnapshotAndRemoteStore(
2892
2911
assert recoveryState .getRecoverySource ().getType () == RecoverySource .Type .SNAPSHOT : "invalid recovery type: "
2893
2912
+ recoveryState .getRecoverySource ();
2894
2913
StoreRecovery storeRecovery = new StoreRecovery (shardId , logger );
2895
- storeRecovery .recoverFromSnapshotAndRemoteStore (this , repository , repositoriesService , listener , threadPool );
2914
+ SnapshotRecoverySource recoverySource = (SnapshotRecoverySource ) recoveryState ().getRecoverySource ();
2915
+ if (recoverySource .getPinnedTimestamp () != 0 ) {
2916
+ storeRecovery .recoverShallowSnapshotV2 (this , repository , repositoriesService , listener , threadPool );
2917
+ } else {
2918
+ storeRecovery .recoverFromSnapshotAndRemoteStore (this , repository , repositoriesService , listener , threadPool );
2919
+ }
2896
2920
} catch (Exception e ) {
2897
2921
listener .onFailure (e );
2898
2922
}
@@ -5000,16 +5024,33 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
5000
5024
TranslogFactory translogFactory = translogFactorySupplier .apply (indexSettings , shardRouting );
5001
5025
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory ;
5002
5026
Repository repository = ((RemoteBlobStoreInternalTranslogFactory ) translogFactory ).getRepository ();
5027
+ syncTranslogFilesFromRemoteTranslog (
5028
+ repository ,
5029
+ shardId ,
5030
+ indexSettings .getRemoteStorePathStrategy (),
5031
+ indexSettings ().isTranslogMetadataEnabled (),
5032
+ 0
5033
+ );
5034
+ }
5035
+
5036
+ public void syncTranslogFilesFromRemoteTranslog (
5037
+ Repository repository ,
5038
+ ShardId shardId ,
5039
+ RemoteStorePathStrategy remoteStorePathStrategy ,
5040
+ boolean isTranslogMetadataEnabled ,
5041
+ long timestamp
5042
+ ) throws IOException {
5003
5043
RemoteFsTranslog .download (
5004
5044
repository ,
5005
5045
shardId ,
5006
5046
getThreadPool (),
5007
5047
shardPath ().resolveTranslog (),
5008
- indexSettings . getRemoteStorePathStrategy () ,
5048
+ remoteStorePathStrategy ,
5009
5049
remoteStoreSettings ,
5010
5050
logger ,
5011
5051
shouldSeedRemoteStore (),
5012
- indexSettings ().isTranslogMetadataEnabled ()
5052
+ isTranslogMetadataEnabled ,
5053
+ timestamp
5013
5054
);
5014
5055
}
5015
5056
@@ -5098,15 +5139,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
5098
5139
* Downloads segments from given remote segment store for a specific commit.
5099
5140
* @param overrideLocal flag to override local segment files with those in remote store
5100
5141
* @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments
5101
- * @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments
5102
- * @param commitGeneration commit generation at the time of commit operation for which we are syncing segments
5103
5142
* @throws IOException if exception occurs while reading segments from remote store
5104
5143
*/
5105
5144
public void syncSegmentsFromGivenRemoteSegmentStore (
5106
5145
boolean overrideLocal ,
5107
5146
RemoteSegmentStoreDirectory sourceRemoteDirectory ,
5108
- long primaryTerm ,
5109
- long commitGeneration
5147
+ RemoteSegmentMetadata remoteSegmentMetadata ,
5148
+ boolean pinnedTimestamp
5110
5149
) throws IOException {
5111
5150
logger .trace ("Downloading segments from given remote segment store" );
5112
5151
RemoteSegmentStoreDirectory remoteDirectory = null ;
@@ -5142,12 +5181,29 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
5142
5181
overrideLocal ,
5143
5182
() -> {}
5144
5183
);
5145
- if (segmentsNFile != null ) {
5184
+ if (pinnedTimestamp ) {
5185
+ final SegmentInfos infosSnapshot = store .buildSegmentInfos (
5186
+ remoteSegmentMetadata .getSegmentInfosBytes (),
5187
+ remoteSegmentMetadata .getGeneration ()
5188
+ );
5189
+ long processedLocalCheckpoint = Long .parseLong (infosSnapshot .getUserData ().get (LOCAL_CHECKPOINT_KEY ));
5190
+ // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
5191
+ // Extra segments will be wiped on engine open.
5192
+ for (String file : List .of (store .directory ().listAll ())) {
5193
+ if (file .startsWith (IndexFileNames .SEGMENTS )) {
5194
+ store .deleteQuiet (file );
5195
+ }
5196
+ }
5197
+ assert Arrays .stream (store .directory ().listAll ()).filter (f -> f .startsWith (IndexFileNames .SEGMENTS )).findAny ().isEmpty ()
5198
+ : "There should not be any segments file in the dir" ;
5199
+ store .commitSegmentInfos (infosSnapshot , processedLocalCheckpoint , processedLocalCheckpoint );
5200
+ } else if (segmentsNFile != null ) {
5146
5201
try (
5147
5202
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput (
5148
5203
storeDirectory .openInput (segmentsNFile , IOContext .DEFAULT )
5149
5204
)
5150
5205
) {
5206
+ long commitGeneration = SegmentInfos .generationFromSegmentsFileName (segmentsNFile );
5151
5207
SegmentInfos infosSnapshot = SegmentInfos .readCommit (store .directory (), indexInput , commitGeneration );
5152
5208
long processedLocalCheckpoint = Long .parseLong (infosSnapshot .getUserData ().get (LOCAL_CHECKPOINT_KEY ));
5153
5209
if (remoteStore != null ) {
0 commit comments