Skip to content

Commit

Permalink
Core: Remove plain execute method on TransportAction (#30998)
Browse files Browse the repository at this point in the history
TransportAction has many variants of execute. One of those variants
executes by returning a future, which is then often blocked on by
calling get(). This commit removes this variant of execute, instead
using a helper method for tests that want to block, or having tests
pass in a PlainActionFuture directly as a listener.

Co-authored-by: Simon Willnauer <[email protected]>
  • Loading branch information
rjernst and s1monw committed Jun 13, 2018
1 parent 1f6e874 commit a65b18f
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ protected TransportAction(Settings settings, String actionName, ThreadPool threa
this.taskManager = taskManager;
}

public final ActionFuture<Response> execute(Request request) {
PlainActionFuture<Response> future = newFuture();
execute(request, future);
return future;
}

/**
* Use this method when the transport action call should result in creation of a new task associated with the call.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
Expand Down Expand Up @@ -65,7 +66,9 @@ public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPo
}

public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(nodesIds).timeout(timeout));
PlainActionFuture<NodesGatewayMetaState> future = PlainActionFuture.newFuture();
execute(new Request(nodesIds).timeout(timeout), future);
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
Expand Down Expand Up @@ -254,8 +255,8 @@ public void onFailure(Exception e) {
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);

// Awaiting for the main task to finish
responseLatch.await();
Expand Down Expand Up @@ -287,9 +288,9 @@ public void onFailure(Exception e) {
}

// Make sure that tasks are no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(
new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get();
ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())));
assertEquals(0, listTasksResponse.getTasks().size());

// Make sure that there are no leftover bans, the ban removal is async, so we might return from the cancellation
Expand Down Expand Up @@ -326,8 +327,8 @@ public void onFailure(Exception e) {
request.setReason("Testing Cancellation");
request.setParentTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction, request);

// Awaiting for the main task to finish
responseLatch.await();
Expand All @@ -336,16 +337,11 @@ public void onFailure(Exception e) {
assertThat(response.getTasks().size(), equalTo(testNodes.length));

assertBusy(() -> {
try {
// Make sure that main task is no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(
new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get();
assertEquals(0, listTasksResponse.getTasks().size());

} catch (ExecutionException | InterruptedException ex) {
throw new RuntimeException(ex);
}
ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())));
assertEquals(0, listTasksResponse.getTasks().size());
});
}

Expand Down Expand Up @@ -378,8 +374,9 @@ public void onFailure(Exception e) {
String mainNode = testNodes[0].getNodeId();

// Make sure that tasks are running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId()))).get();
ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId())));
assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size()));

// Simulate the coordinating node leaving the cluster
Expand All @@ -400,7 +397,7 @@ public void onFailure(Exception e) {
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request);
logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
// This node still thinks that's part of the cluster, so cancelling should look successful
if (response.getTasks().size() == 0) {
Expand All @@ -420,15 +417,10 @@ public void onFailure(Exception e) {

assertBusy(() -> {
// Make sure that tasks are no longer running
try {
ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId()))).get();
assertEquals(0, listTasksResponse1.getTasks().size());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex2) {
fail("shouldn't be here");
}
ListTasksResponse listTasksResponse1 = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(1, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId())));
assertEquals(0, listTasksResponse1.getTasks().size());
});

// Wait for clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.admin.cluster.node.tasks;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
Expand Down Expand Up @@ -363,7 +365,7 @@ public void onFailure(Exception e) {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*"); // pick all test actions
logger.info("Listing currently running tasks using node [{}]", testNodeNum);
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
logger.info("Checking currently running tasks");
assertEquals(testNodes.length, response.getPerNodeTasks().size());

Expand All @@ -382,7 +384,7 @@ public void onFailure(Exception e) {
testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction[n]"); // only pick node actions
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand All @@ -396,7 +398,7 @@ public void onFailure(Exception e) {

// Check task counts using transport with detailed description
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand All @@ -405,7 +407,7 @@ public void onFailure(Exception e) {

// Make sure that the main task on coordinating node is the task that was returned to us by execute()
listTasksRequest.setActions("testAction"); // only pick the main task
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(1, response.getTasks().size());
assertEquals(mainTask.getId(), response.getTasks().get(0).getId());

Expand Down Expand Up @@ -433,15 +435,15 @@ public void testFindChildTasks() throws Exception {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(1, response.getTasks().size());
String parentNode = response.getTasks().get(0).getTaskId().getNodeId();
long parentTaskId = response.getTasks().get(0).getId();

// Find tasks with common parent
listTasksRequest = new ListTasksRequest();
listTasksRequest.setParentTaskId(new TaskId(parentNode, parentTaskId));
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getTasks().size());
for (TaskInfo task : response.getTasks()) {
assertEquals("testAction[n]", task.getAction());
Expand All @@ -467,7 +469,7 @@ public void testTaskManagementOptOut() throws Exception {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(0, response.getTasks().size());

// Release all tasks and wait for response
Expand All @@ -488,7 +490,7 @@ public void testTasksDescriptions() throws Exception {
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction[n]"); // only pick node actions
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand All @@ -498,7 +500,7 @@ public void testTasksDescriptions() throws Exception {
// Check task counts using transport with detailed description
long minimalDurationNanos = System.nanoTime() - maximumStartTimeNanos;
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand Down Expand Up @@ -536,8 +538,8 @@ public void onFailure(Exception e) {
request.setNodes(testNodes[0].getNodeId());
request.setReason("Testing Cancellation");
request.setActions(actionName);
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);

// Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size());
Expand All @@ -549,7 +551,8 @@ public void onFailure(Exception e) {
request = new CancelTasksRequest();
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), task.getId()));
response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get();
response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);

// Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size());
Expand All @@ -560,8 +563,8 @@ public void onFailure(Exception e) {
// Make sure that task is still running
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(actionName);
ListTasksResponse listResponse = testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction.execute
(listTasksRequest).get();
ListTasksResponse listResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction, listTasksRequest);
assertEquals(1, listResponse.getPerNodeTasks().size());
// Verify that tasks are marked as non-cancellable
for (TaskInfo taskInfo : listResponse.getTasks()) {
Expand Down Expand Up @@ -595,7 +598,7 @@ protected NodeResponse nodeOperation(NodeRequest request) {
assertEquals(0, testNode.transportService.getTaskManager().getTasks().size());
}
NodesRequest request = new NodesRequest("Test Request");
NodesResponse responses = actions[0].execute(request).get();
NodesResponse responses = ActionTestUtils.executeBlocking(actions[0], request);
assertEquals(nodesCount, responses.failureCount());

// Make sure that actions are still registered in the task manager on all nodes
Expand Down Expand Up @@ -660,7 +663,7 @@ protected void taskOperation(TestTasksRequest request, Task task, ActionListener
// should be successful on all nodes except one
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get();
TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[0], testTasksRequest);
assertThat(response.getTaskFailures(), hasSize(1)); // one task failed
assertThat(response.getTaskFailures().get(0).getReason(), containsString("Task level failure"));
// Get successful responses from all nodes except one
Expand Down Expand Up @@ -730,7 +733,7 @@ protected void taskOperation(TestTasksRequest request, Task task, ActionListener
// should be successful on all nodes except nodes that we filtered out
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[randomIntBetween(0, nodesCount - 1)].execute(testTasksRequest).get();
TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[randomIntBetween(0, nodesCount - 1)], testTasksRequest);

// Get successful responses from all nodes except nodes that we filtered out
assertEquals(testNodes.length - filterNodes.size(), response.tasks.size());
Expand All @@ -757,7 +760,7 @@ public void testTasksToXContentGrouping() throws Exception {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(ListTasksAction.NAME + "*");
ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length + 1, response.getTasks().size());

Map<String, Object> byNodes = serialize(response, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -141,7 +142,7 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
multiSearchRequest.add(new SearchRequest());
}

MultiSearchResponse response = action.execute(multiSearchRequest).actionGet();
MultiSearchResponse response = ActionTestUtils.executeBlocking(action, multiSearchRequest);
assertThat(response.getResponses().length, equalTo(numSearchRequests));
assertThat(requests.size(), equalTo(numSearchRequests));
assertThat(errorHolder.get(), nullValue());
Expand Down
Loading

0 comments on commit a65b18f

Please sign in to comment.