From 8dfa60d218accad78501952d9e1e41e92cb3e4d2 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Thu, 12 Jul 2018 17:20:31 +0200 Subject: [PATCH] enforcing access to blobStore / blobContainer only to snapshot and generic threads --- .../repositories/RepositoriesService.java | 38 ++++++++++--------- .../blobstore/BlobStoreRepository.java | 5 ++- .../blobstore/BlobStoreRepositoryTests.java | 31 ++++++++++++++- .../SharedClusterSnapshotRestoreIT.java | 22 +++++++++-- .../ESBlobStoreRepositoryIntegTestCase.java | 23 ++++++++--- 5 files changed, 91 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 0bade8cdc8741..c6cbaa50cdf02 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -221,15 +221,17 @@ public void verifyRepository(final String repositoryName, final ActionListener() { @Override public void onResponse(VerifyResponse verifyResponse) { - try { - repository.endVerification(verificationToken); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage( - "[{}] failed to finish repository verification", repositoryName), e); - listener.onFailure(e); - return; - } - listener.onResponse(verifyResponse); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + repository.endVerification(verificationToken); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage( + "[{}] failed to finish repository verification", repositoryName), e); + listener.onFailure(e); + return; + } + listener.onResponse(verifyResponse); + }); } @Override @@ -238,14 +240,16 @@ public void onFailure(Exception e) { } }); } catch (Exception e) { - try { - repository.endVerification(verificationToken); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "[{}] failed to finish repository verification", repositoryName), inner); - } - listener.onFailure(e); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + repository.endVerification(verificationToken); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn(() -> new ParameterizedMessage( + "[{}] failed to finish repository verification", repositoryName), inner); + } + listener.onFailure(e); + }); } } else { listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0])); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 8a764ddf42e4d..37f58e9172d4a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -298,6 +298,8 @@ protected BlobStore getBlobStore() { * maintains single lazy instance of {@link BlobContainer} */ protected BlobContainer blobContainer() { + verificationThreadCheck(); + BlobContainer blobContainer = this.blobContainer.get(); if (blobContainer == null) { synchronized (lock) { @@ -316,6 +318,8 @@ protected BlobContainer blobContainer() { * maintains single lazy instance of {@link BlobStore} */ protected BlobStore blobStore() { + verificationThreadCheck(); + BlobStore store = blobStore.get(); if (store == null) { synchronized (lock) { @@ -324,7 +328,6 @@ protected BlobStore blobStore() { if (lifecycle.started() == false) { throw new RepositoryException(metadata.name(), "repository is not in started state"); } - verificationThreadCheck(); try { store = createBlobStore(); } catch (RepositoryException e) { diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 9c14a9778760f..9f9bf18956e29 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -24,10 +24,16 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; @@ -37,8 +43,10 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData; @@ -50,6 +58,25 @@ */ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { + static final String REPO_TYPE = "fsLike"; + + protected Collection> getPlugins() { + return Arrays.asList(FsLikeRepoPlugin.class); + } + + public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin { + + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { + return Collections.singletonMap(REPO_TYPE, + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry) { + @Override + protected void verificationThreadCheck() { + } + }); + } + } + public void testRetrieveSnapshots() throws Exception { final Client client = client(); final Path location = ESIntegTestCase.randomRepoPath(node().settings()); @@ -58,7 +85,7 @@ public void testRetrieveSnapshots() throws Exception { logger.info("--> creating repository"); PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName) - .setType("fs") + .setType(REPO_TYPE) .setSettings(Settings.builder().put(node().settings()).put("location", location)) .get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); @@ -210,7 +237,7 @@ private BlobStoreRepository setupRepo() { PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName) - .setType("fs") + .setType(REPO_TYPE) .setSettings(Settings.builder().put(node().settings()).put("location", location)) .get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 3899bfcbec327..d2954a4c128ba 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.snapshots; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; @@ -93,6 +94,7 @@ import org.elasticsearch.script.StoredScriptsIT; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.SeekableByteChannel; @@ -1262,7 +1264,7 @@ public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exceptio RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); Repository repository = service.repository("test-repo"); - final Map indexIds = repository.getRepositoryData().getIndices(); + final Map indexIds = getRepositoryData(repository).getIndices(); final Path indicesPath = repo.resolve("indices"); logger.info("--> delete index metadata and shard metadata"); @@ -2564,7 +2566,7 @@ public void testDeleteOrphanSnapshot() throws Exception { logger.info("--> emulate an orphan snapshot"); RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); - final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData(); + final RepositoryData repositoryData = getRepositoryData(repositoriesService.repository(repositoryName)); final IndexId indexId = repositoryData.resolveIndexId(idxName); clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() { @@ -2785,7 +2787,8 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception { RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); Repository repository = service.repository("test-repo"); - final Map indexIds = repository.getRepositoryData().getIndices(); + final RepositoryData repositoryData = getRepositoryData(repository); + final Map indexIds = repositoryData.getIndices(); assertThat(indexIds.size(), equalTo(nbIndices)); // Choose a random index from the snapshot @@ -3446,6 +3449,19 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { } } + private RepositoryData getRepositoryData(Repository repository) throws InterruptedException { + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName()); + final SetOnce repositoryData = new SetOnce<>(); + final CountDownLatch latch = new CountDownLatch(1); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + repositoryData.set(repository.getRepositoryData()); + latch.countDown(); + }); + + latch.await(); + return repositoryData.get(); + } + private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map> indicesPerSnapshot) { for (SnapshotInfo snapshotInfo : response.getSnapshots()) { final List expected = snapshotInfo.indices(); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 226ec63845a9f..439728bac9ea6 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.repositories.blobstore; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; @@ -32,12 +33,14 @@ import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotRestoreException; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -202,7 +205,7 @@ public void testMultipleSnapshotAndRollback() throws Exception { } } - public void testIndicesDeletedFromRepository() { + public void testIndicesDeletedFromRepository() throws Exception { Client client = client(); logger.info("--> creating repository"); @@ -244,12 +247,22 @@ public void testIndicesDeletedFromRepository() { logger.info("--> verify index folder deleted from blob container"); RepositoriesService repositoriesSvc = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); + ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName()); @SuppressWarnings("unchecked") BlobStoreRepository repository = (BlobStoreRepository) repositoriesSvc.repository(repoName); - BlobContainer indicesBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("indices")); - RepositoryData repositoryData = repository.getRepositoryData(); - for (IndexId indexId : repositoryData.getIndices().values()) { + + final SetOnce indicesBlobContainer = new SetOnce<>(); + final SetOnce repositoryData = new SetOnce<>(); + final CountDownLatch latch = new CountDownLatch(1); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + indicesBlobContainer.set(repository.blobStore().blobContainer(repository.basePath().add("indices"))); + repositoryData.set(repository.getRepositoryData()); + latch.countDown(); + }); + + latch.await(); + for (IndexId indexId : repositoryData.get().getIndices().values()) { if (indexId.getName().equals("test-idx-3")) { - assertFalse(indicesBlobContainer.blobExists(indexId.getId())); // deleted index + assertFalse(indicesBlobContainer.get().blobExists(indexId.getId())); // deleted index } } }