66
66
import org .opensearch .common .Nullable ;
67
67
import org .opensearch .common .Numbers ;
68
68
import org .opensearch .common .Priority ;
69
+ import org .opensearch .common .Randomness ;
69
70
import org .opensearch .common .SetOnce ;
70
71
import org .opensearch .common .UUIDs ;
71
72
import org .opensearch .common .blobstore .BlobContainer ;
@@ -1460,6 +1461,10 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
1460
1461
ActionListener <Void > listener
1461
1462
) {
1462
1463
final List <Tuple <BlobPath , String >> filesToDelete = resolveFilesToDelete (oldRepositoryData , snapshotIds , deleteResults );
1464
+ long startTimeNs = System .nanoTime ();
1465
+ Randomness .shuffle (filesToDelete );
1466
+ logger .debug ("[{}] shuffled the filesToDelete with timeElapsedNs={}" , metadata .name (), (System .nanoTime () - startTimeNs ));
1467
+
1463
1468
if (filesToDelete .isEmpty ()) {
1464
1469
listener .onResponse (null );
1465
1470
return ;
@@ -1477,8 +1482,8 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
1477
1482
staleFilesToDeleteInBatch .size ()
1478
1483
);
1479
1484
1480
- // Start as many workers as fit into the snapshot pool at once at the most
1481
- final int workers = Math .min (threadPool .info (ThreadPool .Names .SNAPSHOT ).getMax (), staleFilesToDeleteInBatch .size ());
1485
+ // Start as many workers as fit into the snapshot_deletion pool at once at the most
1486
+ final int workers = Math .min (threadPool .info (ThreadPool .Names .SNAPSHOT_DELETION ).getMax (), staleFilesToDeleteInBatch .size ());
1482
1487
for (int i = 0 ; i < workers ; ++i ) {
1483
1488
executeStaleShardDelete (staleFilesToDeleteInBatch , remoteStoreLockManagerFactory , groupedListener );
1484
1489
}
@@ -1582,7 +1587,7 @@ private void executeStaleShardDelete(
1582
1587
if (filesToDelete == null ) {
1583
1588
return ;
1584
1589
}
1585
- threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (ActionRunnable .wrap (listener , l -> {
1590
+ threadPool .executor (ThreadPool .Names .SNAPSHOT_DELETION ).execute (ActionRunnable .wrap (listener , l -> {
1586
1591
try {
1587
1592
// filtering files for which remote store lock release and cleanup succeeded,
1588
1593
// remaining files for which it failed will be retried in next snapshot delete run.
@@ -1646,7 +1651,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(
1646
1651
ActionListener <Collection <ShardSnapshotMetaDeleteResult >> onAllShardsCompleted
1647
1652
) {
1648
1653
1649
- final Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT );
1654
+ final Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT_DELETION );
1650
1655
final List <IndexId > indices = oldRepositoryData .indicesToUpdateAfterRemovingSnapshot (snapshotIds );
1651
1656
1652
1657
if (indices .isEmpty ()) {
@@ -1836,7 +1841,7 @@ private void cleanupStaleBlobs(
1836
1841
listener .onResponse (deleteResult );
1837
1842
}, listener ::onFailure ), 2 );
1838
1843
1839
- final Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT );
1844
+ final Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT_DELETION );
1840
1845
final List <String > staleRootBlobs = staleRootBlobs (newRepoData , rootBlobs .keySet ());
1841
1846
if (staleRootBlobs .isEmpty ()) {
1842
1847
groupedListener .onResponse (DeleteResult .ZERO );
@@ -2049,7 +2054,7 @@ void cleanupStaleIndices(
2049
2054
2050
2055
// Start as many workers as fit into the snapshot pool at once at the most
2051
2056
final int workers = Math .min (
2052
- threadPool .info (ThreadPool .Names .SNAPSHOT ).getMax (),
2057
+ threadPool .info (ThreadPool .Names .SNAPSHOT_DELETION ).getMax (),
2053
2058
foundIndices .size () - survivingIndexIds .size ()
2054
2059
);
2055
2060
for (int i = 0 ; i < workers ; ++i ) {
@@ -2107,7 +2112,7 @@ private void executeOneStaleIndexDelete(
2107
2112
return ;
2108
2113
}
2109
2114
final String indexSnId = indexEntry .getKey ();
2110
- threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (ActionRunnable .supply (listener , () -> {
2115
+ threadPool .executor (ThreadPool .Names .SNAPSHOT_DELETION ).execute (ActionRunnable .supply (listener , () -> {
2111
2116
try {
2112
2117
logger .debug ("[{}] Found stale index [{}]. Cleaning it up" , metadata .name (), indexSnId );
2113
2118
List <String > matchingShardPaths = findMatchingShardPaths (indexSnId , snapshotShardPaths );
@@ -2475,9 +2480,10 @@ public void finalizeSnapshot(
2475
2480
repositoryUpdatePriority ,
2476
2481
ActionListener .wrap (newRepoData -> {
2477
2482
if (writeShardGens ) {
2478
- cleanupOldShardGens (existingRepositoryData , updatedRepositoryData );
2483
+ cleanupOldShardGens (existingRepositoryData , updatedRepositoryData , newRepoData , listener );
2484
+ } else {
2485
+ listener .onResponse (newRepoData );
2479
2486
}
2480
- listener .onResponse (newRepoData );
2481
2487
}, onUpdateFailure )
2482
2488
);
2483
2489
}, onUpdateFailure ), 2 + indices .size ());
@@ -2642,7 +2648,12 @@ private void logShardPathsOperationWarning(IndexId indexId, SnapshotId snapshotI
2642
2648
}
2643
2649
2644
2650
// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
2645
- private void cleanupOldShardGens (RepositoryData existingRepositoryData , RepositoryData updatedRepositoryData ) {
2651
+ private void cleanupOldShardGens (
2652
+ RepositoryData existingRepositoryData ,
2653
+ RepositoryData updatedRepositoryData ,
2654
+ RepositoryData newRepositoryData ,
2655
+ ActionListener <RepositoryData > listener
2656
+ ) {
2646
2657
final List <String > toDelete = new ArrayList <>();
2647
2658
updatedRepositoryData .shardGenerations ()
2648
2659
.obsoleteShardGenerations (existingRepositoryData .shardGenerations ())
@@ -2651,10 +2662,62 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
2651
2662
(shardId , oldGen ) -> toDelete .add (shardPath (indexId , shardId ).buildAsString () + INDEX_FILE_PREFIX + oldGen )
2652
2663
)
2653
2664
);
2665
+ if (toDelete .isEmpty ()) {
2666
+ listener .onResponse (newRepositoryData );
2667
+ return ;
2668
+ }
2654
2669
try {
2655
- deleteFromContainer (rootBlobContainer (), toDelete );
2670
+ AtomicInteger counter = new AtomicInteger ();
2671
+ Collection <List <String >> subList = toDelete .stream ()
2672
+ .collect (Collectors .groupingBy (it -> counter .getAndIncrement () / maxShardBlobDeleteBatch ))
2673
+ .values ();
2674
+ final BlockingQueue <List <String >> staleFilesToDeleteInBatch = new LinkedBlockingQueue <>(subList );
2675
+ logger .info (
2676
+ "[{}] cleanupOldShardGens toDeleteSize={} groupSize={}" ,
2677
+ metadata .name (),
2678
+ toDelete .size (),
2679
+ staleFilesToDeleteInBatch .size ()
2680
+ );
2681
+ final GroupedActionListener <Void > groupedListener = new GroupedActionListener <>(ActionListener .wrap (r -> {
2682
+ logger .info ("[{}] completed cleanupOldShardGens" , metadata .name ());
2683
+ listener .onResponse (newRepositoryData );
2684
+ }, ex -> {
2685
+ logger .error (new ParameterizedMessage ("[{}] exception in cleanupOldShardGens" , metadata .name ()), ex );
2686
+ listener .onResponse (newRepositoryData );
2687
+ }), staleFilesToDeleteInBatch .size ());
2688
+
2689
+ // Start as many workers as fit into the snapshot pool at once at the most
2690
+ final int workers = Math .min (threadPool .info (ThreadPool .Names .SNAPSHOT_DELETION ).getMax (), staleFilesToDeleteInBatch .size ());
2691
+ for (int i = 0 ; i < workers ; ++i ) {
2692
+ executeOldShardGensCleanup (staleFilesToDeleteInBatch , groupedListener );
2693
+ }
2656
2694
} catch (Exception e ) {
2657
- logger .warn ("Failed to clean up old shard generation blobs" , e );
2695
+ logger .warn (new ParameterizedMessage (" [{}] Failed to clean up old shard generation blobs" , metadata .name ()), e );
2696
+ listener .onResponse (newRepositoryData );
2697
+ }
2698
+ }
2699
+
2700
+ private void executeOldShardGensCleanup (BlockingQueue <List <String >> staleFilesToDeleteInBatch , GroupedActionListener <Void > listener )
2701
+ throws InterruptedException {
2702
+ List <String > filesToDelete = staleFilesToDeleteInBatch .poll (0L , TimeUnit .MILLISECONDS );
2703
+ if (filesToDelete != null ) {
2704
+ threadPool .executor (ThreadPool .Names .SNAPSHOT_DELETION ).execute (ActionRunnable .wrap (listener , l -> {
2705
+ try {
2706
+ deleteFromContainer (rootBlobContainer (), filesToDelete );
2707
+ l .onResponse (null );
2708
+ } catch (Exception e ) {
2709
+ logger .warn (
2710
+ () -> new ParameterizedMessage (
2711
+ "[{}] Failed to delete following blobs during cleanupOldFiles : {}" ,
2712
+ metadata .name (),
2713
+ filesToDelete
2714
+ ),
2715
+ e
2716
+ );
2717
+ l .onFailure (e );
2718
+ }
2719
+ executeOldShardGensCleanup (staleFilesToDeleteInBatch , listener );
2720
+ }));
2658
2721
}
2659
2722
}
2660
2723
@@ -2771,10 +2834,11 @@ public long getRemoteDownloadThrottleTimeInNanos() {
2771
2834
}
2772
2835
2773
2836
protected void assertSnapshotOrGenericThread () {
2774
- assert Thread .currentThread ().getName ().contains ('[' + ThreadPool .Names .SNAPSHOT + ']' )
2837
+ assert Thread .currentThread ().getName ().contains ('[' + ThreadPool .Names .SNAPSHOT_DELETION + ']' )
2838
+ || Thread .currentThread ().getName ().contains ('[' + ThreadPool .Names .SNAPSHOT + ']' )
2775
2839
|| Thread .currentThread ().getName ().contains ('[' + ThreadPool .Names .GENERIC + ']' ) : "Expected current thread ["
2776
2840
+ Thread .currentThread ()
2777
- + "] to be the snapshot or generic thread." ;
2841
+ + "] to be the snapshot_deletion or snapshot or generic thread." ;
2778
2842
}
2779
2843
2780
2844
@ Override
0 commit comments