Skip to content

Commit ade684c

Browse files
committed
Adapt SyncedFlushService (#37691)
1 parent 14c3061 commit ade684c

File tree

2 files changed

+21
-16
lines changed

2 files changed

+21
-16
lines changed

server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.cluster.metadata.IndexMetaData;
3333
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3434
import org.elasticsearch.cluster.node.DiscoveryNode;
35-
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3635
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3736
import org.elasticsearch.cluster.routing.ShardRouting;
3837
import org.elasticsearch.cluster.service.ClusterService;
@@ -289,16 +288,14 @@ private void reportSuccessWithExistingSyncId(ShardId shardId,
289288
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
290289
}
291290

292-
final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
293-
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
294-
if (indexRoutingTable == null) {
295-
IndexMetaData index = state.getMetaData().index(shardId.getIndex());
296-
if (index != null && index.getState() == IndexMetaData.State.CLOSE) {
297-
throw new IndexClosedException(shardId.getIndex());
298-
}
291+
final IndexShardRoutingTable getShardRoutingTable(final ShardId shardId, final ClusterState state) {
292+
final IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex());
293+
if (indexMetaData == null) {
299294
throw new IndexNotFoundException(shardId.getIndexName());
295+
} else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
296+
throw new IndexClosedException(shardId.getIndex());
300297
}
301-
final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.id());
298+
final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(indexMetaData.getIndex()).shard(shardId.id());
302299
if (shardRoutingTable == null) {
303300
throw new ShardNotFoundException(shardId);
304301
}

server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java

+15-7
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import org.elasticsearch.action.support.PlainActionFuture;
2222
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2324
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2425
import org.elasticsearch.cluster.routing.ShardRouting;
2526
import org.elasticsearch.cluster.service.ClusterService;
2627
import org.elasticsearch.common.UUIDs;
2728
import org.elasticsearch.common.lease.Releasable;
29+
import org.elasticsearch.common.settings.Settings;
2830
import org.elasticsearch.common.xcontent.XContentType;
2931
import org.elasticsearch.index.IndexService;
3032
import org.elasticsearch.index.shard.IndexShard;
@@ -38,6 +40,8 @@
3840
import java.util.Map;
3941
import java.util.concurrent.ExecutionException;
4042

43+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
44+
4145
public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
4246

4347
public void testModificationPreventsFlushing() throws InterruptedException {
@@ -130,22 +134,26 @@ public void testSyncFailsIfOperationIsInFlight() throws InterruptedException, Ex
130134
}
131135

132136
public void testSyncFailsOnIndexClosedOrMissing() throws InterruptedException {
133-
createIndex("test");
137+
createIndex("test", Settings.builder()
138+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
139+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
140+
.build());
134141
IndexService test = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test"));
135-
IndexShard shard = test.getShardOrNull(0);
142+
final IndexShard shard = test.getShardOrNull(0);
143+
assertNotNull(shard);
144+
final ShardId shardId = shard.shardId();
145+
146+
final SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
136147

137-
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
138148
SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener();
139-
flushService.attemptSyncedFlush(new ShardId("test", "_na_", 1), listener);
149+
flushService.attemptSyncedFlush(new ShardId(shard.shardId().getIndex(), 1), listener);
140150
listener.latch.await();
141151
assertNotNull(listener.error);
142152
assertNull(listener.result);
143153
assertEquals(ShardNotFoundException.class, listener.error.getClass());
144154
assertEquals("no such shard", listener.error.getMessage());
145155

146-
final ShardId shardId = shard.shardId();
147-
148-
client().admin().indices().prepareClose("test").get();
156+
assertAcked(client().admin().indices().prepareClose("test"));
149157
listener = new SyncedFlushUtil.LatchedListener();
150158
flushService.attemptSyncedFlush(shardId, listener);
151159
listener.latch.await();

0 commit comments

Comments
 (0)