Skip to content

Commit

Permalink
enforcing access to blobStore / blobContainer only to snapshot and ge…
Browse files Browse the repository at this point in the history
…neric threads
  • Loading branch information
Vladimir Dolzhenko committed Jul 12, 2018
1 parent d8c9593 commit 8dfa60d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,17 @@ public void verifyRepository(final String repositoryName, final ActionListener<V
verifyAction.verify(repositoryName, verificationToken, new ActionListener<VerifyResponse>() {
@Override
public void onResponse(VerifyResponse verifyResponse) {
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), e);
listener.onFailure(e);
return;
}
listener.onResponse(verifyResponse);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), e);
listener.onFailure(e);
return;
}
listener.onResponse(verifyResponse);
});
}

@Override
Expand All @@ -238,14 +240,16 @@ public void onFailure(Exception e) {
}
});
} catch (Exception e) {
try {
repository.endVerification(verificationToken);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), inner);
}
listener.onFailure(e);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), inner);
}
listener.onFailure(e);
});
}
} else {
listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ protected BlobStore getBlobStore() {
* maintains single lazy instance of {@link BlobContainer}
*/
protected BlobContainer blobContainer() {
verificationThreadCheck();

BlobContainer blobContainer = this.blobContainer.get();
if (blobContainer == null) {
synchronized (lock) {
Expand All @@ -316,6 +318,8 @@ protected BlobContainer blobContainer() {
* maintains single lazy instance of {@link BlobStore}
*/
protected BlobStore blobStore() {
verificationThreadCheck();

BlobStore store = blobStore.get();
if (store == null) {
synchronized (lock) {
Expand All @@ -324,7 +328,6 @@ protected BlobStore blobStore() {
if (lifecycle.started() == false) {
throw new RepositoryException(metadata.name(), "repository is not in started state");
}
verificationThreadCheck();
try {
store = createBlobStore();
} catch (RepositoryException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -37,8 +43,10 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
Expand All @@ -50,6 +58,25 @@
*/
public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {

static final String REPO_TYPE = "fsLike";

protected Collection<Class<? extends Plugin>> getPlugins() {
return Arrays.asList(FsLikeRepoPlugin.class);
}

public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {

This comment has been minimized.

Copy link
@ywelsch

ywelsch Jul 13, 2018

Contributor

can you add a comment saying why we have this plugin?

This comment has been minimized.

Copy link
@vladimirdolzhenko

vladimirdolzhenko Jul 13, 2018

Contributor

agree


@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(REPO_TYPE,
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry) {
@Override
protected void verificationThreadCheck() {
}
});
}
}

public void testRetrieveSnapshots() throws Exception {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
Expand All @@ -58,7 +85,7 @@ public void testRetrieveSnapshots() throws Exception {
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse =
client.admin().cluster().preparePutRepository(repositoryName)
.setType("fs")
.setType(REPO_TYPE)
.setSettings(Settings.builder().put(node().settings()).put("location", location))
.get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
Expand Down Expand Up @@ -210,7 +237,7 @@ private BlobStoreRepository setupRepo() {

PutRepositoryResponse putRepositoryResponse =
client.admin().cluster().preparePutRepository(repositoryName)
.setType("fs")
.setType(REPO_TYPE)
.setSettings(Settings.builder().put(node().settings()).put("location", location))
.get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.snapshots;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.elasticsearch.script.StoredScriptsIT;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
Expand Down Expand Up @@ -1262,7 +1264,7 @@ public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exceptio
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
Repository repository = service.repository("test-repo");

final Map<String, IndexId> indexIds = repository.getRepositoryData().getIndices();
final Map<String, IndexId> indexIds = getRepositoryData(repository).getIndices();
final Path indicesPath = repo.resolve("indices");

logger.info("--> delete index metadata and shard metadata");
Expand Down Expand Up @@ -2564,7 +2566,7 @@ public void testDeleteOrphanSnapshot() throws Exception {

logger.info("--> emulate an orphan snapshot");
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
final RepositoryData repositoryData = getRepositoryData(repositoriesService.repository(repositoryName));
final IndexId indexId = repositoryData.resolveIndexId(idxName);

clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() {
Expand Down Expand Up @@ -2785,7 +2787,8 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception {
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
Repository repository = service.repository("test-repo");

final Map<String, IndexId> indexIds = repository.getRepositoryData().getIndices();
final RepositoryData repositoryData = getRepositoryData(repository);
final Map<String, IndexId> indexIds = repositoryData.getIndices();
assertThat(indexIds.size(), equalTo(nbIndices));

// Choose a random index from the snapshot
Expand Down Expand Up @@ -3446,6 +3449,19 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
}
}

private RepositoryData getRepositoryData(Repository repository) throws InterruptedException {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
repositoryData.set(repository.getRepositoryData());
latch.countDown();
});

latch.await();
return repositoryData.get();
}

private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map<String, List<String>> indicesPerSnapshot) {
for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
final List<String> expected = snapshotInfo.indices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
Expand All @@ -32,12 +33,14 @@
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -202,7 +205,7 @@ public void testMultipleSnapshotAndRollback() throws Exception {
}
}

public void testIndicesDeletedFromRepository() {
public void testIndicesDeletedFromRepository() throws Exception {
Client client = client();

logger.info("--> creating repository");
Expand Down Expand Up @@ -244,12 +247,22 @@ public void testIndicesDeletedFromRepository() {

logger.info("--> verify index folder deleted from blob container");
RepositoriesService repositoriesSvc = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
@SuppressWarnings("unchecked") BlobStoreRepository repository = (BlobStoreRepository) repositoriesSvc.repository(repoName);
BlobContainer indicesBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("indices"));
RepositoryData repositoryData = repository.getRepositoryData();
for (IndexId indexId : repositoryData.getIndices().values()) {

final SetOnce<BlobContainer> indicesBlobContainer = new SetOnce<>();
final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
indicesBlobContainer.set(repository.blobStore().blobContainer(repository.basePath().add("indices")));
repositoryData.set(repository.getRepositoryData());
latch.countDown();
});

latch.await();
for (IndexId indexId : repositoryData.get().getIndices().values()) {
if (indexId.getName().equals("test-idx-3")) {
assertFalse(indicesBlobContainer.blobExists(indexId.getId())); // deleted index
assertFalse(indicesBlobContainer.get().blobExists(indexId.getId())); // deleted index
}
}
}
Expand Down

0 comments on commit 8dfa60d

Please sign in to comment.