Skip to content

Commit

Permalink
Fix unexpected task status (#1767)
Browse files Browse the repository at this point in the history
* fix unexpected task status

* fix task-manager become single-node mode due to server-info is missing

* fix don't auto save server-info on master node

* fix: create index label task
  • Loading branch information
javeme authored Mar 11, 2022
1 parent 6856cc2 commit 5bb860f
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Map<String, Object> update(@Context GraphManager manager,

private static TaskStatus parseStatus(String status) {
try {
return TaskStatus.valueOf(status);
return TaskStatus.valueOf(status.toUpperCase());
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"Status value must be in %s, but got '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,19 @@ public String toString() {
@Override
public void run() {
if (this.cancelled()) {
// Scheduled task is running after cancelled
// A task is running after cancelled which scheduled/queued before
return;
}

TaskManager.setContext(this.context());
try {
assert this.status.code() < TaskStatus.RUNNING.code() : this.status;
if (this.checkDependenciesSuccess()) {
/*
* FIXME: worker node may reset status to RUNNING here, and the
* status in DB is CANCELLING that set by master node,
* it will lead to cancel() operation not to take effect.
*/
this.status(TaskStatus.RUNNING);
super.run();
}
Expand All @@ -308,7 +313,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
// Callback for saving status to store
this.callable.cancelled();
} else {
// Maybe the worker is still running then set status SUCCESS
// Maybe worker node is still running then set status SUCCESS
cancelled = false;
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ public synchronized void initServerInfo(Id server, NodeRole role) {
} while (page != null);
}

HugeServerInfo serverInfo = new HugeServerInfo(server, role);
serverInfo.maxLoad(this.calcMaxLoad());
this.save(serverInfo);

LOG.info("Init server info: {}", serverInfo);
this.saveServerInfo(this.selfServerId, this.selfServerRole);
}

public Id selfServerId() {
Expand All @@ -186,8 +182,10 @@ public boolean onlySingleNode() {

public void heartbeat() {
HugeServerInfo serverInfo = this.selfServerInfo();
if (serverInfo == null) {
return;
if (serverInfo == null && this.selfServerId != null &&
this.selfServerRole != NodeRole.MASTER) {
serverInfo = this.saveServerInfo(this.selfServerId,
this.selfServerRole);
}
serverInfo.updateTime(DateUtil.now());
this.save(serverInfo);
Expand Down Expand Up @@ -239,7 +237,11 @@ protected synchronized HugeServerInfo pickWorkerNode(
}
}

this.onlySingleNode = !hasWorkerNode;
boolean singleNode = !hasWorkerNode;
if (singleNode != this.onlySingleNode) {
LOG.info("Switch only_single_node to {}", singleNode);
this.onlySingleNode = singleNode;
}

// Only schedule to master if there is no workers and master is suitable
if (!hasWorkerNode) {
Expand All @@ -260,26 +262,35 @@ private GraphTransaction tx() {
return this.graph.systemTransaction();
}

private Id save(HugeServerInfo server) {
private HugeServerInfo saveServerInfo(Id server, NodeRole role) {
HugeServerInfo serverInfo = new HugeServerInfo(server, role);
serverInfo.maxLoad(this.calcMaxLoad());
this.save(serverInfo);

LOG.info("Init server info: {}", serverInfo);
return serverInfo;
}

private Id save(HugeServerInfo serverInfo) {
return this.call(() -> {
// Construct vertex from server info
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, server);
HugeServerInfo.P.SERVER, serverInfo);
}
HugeVertex vertex = this.tx().constructVertex(false,
server.asArray());
serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
});
}

private int save(Collection<HugeServerInfo> servers) {
private int save(Collection<HugeServerInfo> serverInfos) {
return this.call(() -> {
if (servers.isEmpty()) {
return servers.size();
if (serverInfos.isEmpty()) {
return serverInfos.size();
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
Expand All @@ -289,7 +300,7 @@ private int save(Collection<HugeServerInfo> servers) {
// Save server info in batch
GraphTransaction tx = this.tx();
int updated = 0;
for (HugeServerInfo server : servers) {
for (HugeServerInfo server : serverInfos) {
if (!server.updated()) {
continue;
}
Expand Down Expand Up @@ -319,7 +330,11 @@ private <V> V call(Callable<V> callable) {
}

private HugeServerInfo selfServerInfo() {
return this.serverInfo(this.selfServerId);
HugeServerInfo selfServerInfo = this.serverInfo(this.selfServerId);
if (selfServerInfo == null) {
LOG.warn("ServerInfo is missing: {}", this.selfServerId);
}
return selfServerInfo;
}

private HugeServerInfo serverInfo(Id server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public <V> Future<?> schedule(HugeTask<V> task) {
/*
* Due to EphemeralJob won't be serialized and deserialized through
* shared storage, submit EphemeralJob immediately on master
* NOTE: don't need to save EphemeralJob task
*/
task.status(TaskStatus.QUEUED);
return this.submitTask(task);
Expand All @@ -228,16 +229,16 @@ public <V> Future<?> schedule(HugeTask<V> task) {

if (this.serverManager().onlySingleNode() && !task.computer()) {
/*
* Speed up for single node, submit task immediately
* this can be removed without affecting logic
* Speed up for single node, submit task immediately,
* this code can be removed without affecting logic
*/
task.status(TaskStatus.QUEUED);
task.server(this.serverManager().selfServerId());
this.save(task);
return this.submitTask(task);
} else {
/*
* Just set SCHEDULING status and save task
* Just set SCHEDULING status and save task,
* it will be scheduled by periodic scheduler worker
*/
task.status(TaskStatus.SCHEDULING);
Expand Down Expand Up @@ -394,6 +395,7 @@ protected void executeTasksOnWorker(Id server) {
}
if (taskServer.equals(server)) {
task.status(TaskStatus.QUEUED);
this.save(task);
this.submitTask(task);
}
}
Expand Down Expand Up @@ -467,6 +469,7 @@ protected void remove(HugeTask<?> task) {

@Override
public <V> void save(HugeTask<V> task) {
LOG.debug("Saving task: {}", task);
task.scheduler(this);
E.checkArgumentNotNull(task, "Task can't be null");
this.call(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class TaskManager {
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";

protected static final int SCHEDULE_PERIOD = 1; // Unit second
protected static final long SCHEDULE_PERIOD = 1000L; // unit ms

private static final int THREADS = 4;
private static final TaskManager MANAGER = new TaskManager(THREADS);
Expand Down Expand Up @@ -79,10 +79,11 @@ private TaskManager(int pool) {
// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
// Start after 10s waiting for HugeGraphServer startup
// Start after 10x period time waiting for HugeGraphServer startup
this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob,
10L, SCHEDULE_PERIOD,
TimeUnit.SECONDS);
10 * SCHEDULE_PERIOD,
SCHEDULE_PERIOD,
TimeUnit.MILLISECONDS);
}

public void addScheduler(HugeGraphParams graph) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
import org.junit.BeforeClass;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.util.CollectionUtil;
import com.baidu.hugegraph.util.JsonUtil;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;

public class BaseApiTest {
Expand Down Expand Up @@ -294,7 +296,7 @@ protected static void initEdgeLabel() {
+ "}");
}

protected static void initIndexLabel() {
protected static int initIndexLabel() {
String path = URL_PREFIX + SCHEMA_ILS;

Response r = client.post(path, "{\n"
Expand All @@ -303,11 +305,13 @@ protected static void initIndexLabel() {
+ "\"base_value\": \"person\",\n"
+ "\"index_type\": \"SECONDARY\",\n"
+ "\"check_exist\": false,\n"
+ "\"rebuild\": false,\n"
+ "\"fields\": [\n"
+ "\"city\"\n"
+ "]\n"
+ "}");
assertResponseStatus(202, r);
String content = assertResponseStatus(202, r);
return assertJsonContains(content, "task_id");
}

protected static void initEdge() {
Expand Down Expand Up @@ -519,6 +523,8 @@ protected static void clearSchema() {
List<Map> list = readList(content, type, Map.class);
List<Object> names = list.stream().map(e -> e.get("name"))
.collect(Collectors.toList());
Assert.assertTrue("Expect all names are unique: " + names,
CollectionUtil.allUnique(names));
Set<Integer> tasks = new HashSet<>();
names.forEach(name -> {
Response response = client.delete(path, (String) name);
Expand All @@ -540,13 +546,30 @@ protected static void clearSchema() {
}

protected static void waitTaskSuccess(int task) {
waitTaskStatus(task, ImmutableSet.of("success"));
}

protected static void waitTaskCompleted(int task) {
Set<String> completed = ImmutableSet.of("success",
"cancelled",
"failed");
waitTaskStatus(task, completed);
}

protected static void waitTaskStatus(int task, Set<String> expectedStatus) {
String status;
int times = 0;
int maxTimes = 100000;
do {
Response r = client.get("/graphs/hugegraph/tasks/",
String.valueOf(task));
String content = assertResponseStatus(200, r);
status = assertJsonContains(content, "task_status");
} while (!"success".equals(status));
if (times++ > maxTimes) {
Assert.fail(String.format("Failed to wait for task %s " +
"due to timeout", task));
}
} while (!expectedStatus.contains(status));
}

protected static String parseId(String content) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void prepareSchema() {

@Test
public void testList() {
// create a task
int taskId = this.rebuild();

Response r = client().get(path, ImmutableMap.of("limit", -1));
Expand All @@ -52,6 +53,12 @@ public void testList() {
assertArrayContains(tasks, "id", taskId);

waitTaskSuccess(taskId);

r = client().get(path, String.valueOf(taskId));
content = assertResponseStatus(200, r);
String status = assertJsonContains(content, "task_status");
Assert.assertEquals("success", status);

/*
* FIXME: sometimes may get results of RUNNING tasks after the task
* status is SUCCESS, which is stored in DB if there are worker
Expand All @@ -62,23 +69,36 @@ public void testList() {
r = client().get(path, ImmutableMap.of("status", "RUNNING"));
content = assertResponseStatus(200, r);
tasks = assertJsonContains(content, "tasks");
Assert.assertTrue(tasks.toString(), tasks.isEmpty());
String message = String.format("Expect none RUNNING tasks(%d), " +
"but got %s", taskId, tasks);
Assert.assertTrue(message, tasks.isEmpty());
}

@Test
public void testGet() {
// create a task
int taskId = this.rebuild();

Response r = client().get(path, String.valueOf(taskId));
String content = assertResponseStatus(200, r);
assertJsonContains(content, "id");

waitTaskSuccess(taskId);

r = client().get(path, String.valueOf(taskId));
content = assertResponseStatus(200, r);
String status = assertJsonContains(content, "task_status");
Assert.assertEquals("success", status);
}

@Test
public void testCancel() {
// create a task
int taskId = this.gremlinJob();

sleepAWhile();

// cancel task
Map<String, Object> params = ImmutableMap.of("action", "cancel");
Response r = client().put(path, String.valueOf(taskId), "", params);
String content = r.readEntity(String.class);
Expand All @@ -88,6 +108,12 @@ public void testCancel() {
String status = assertJsonContains(content, "task_status");
Assert.assertTrue(status, status.equals("cancelling") ||
status.equals("cancelled"));
/*
* NOTE: should be waitTaskStatus(taskId, "cancelled"), but worker
* node may ignore the CANCELLING status due to now we can't atomic
* update task status, and then the task is running to SUCCESS.
*/
waitTaskCompleted(taskId);
} else {
assert r.getStatus() == 400;
String error = String.format(
Expand All @@ -103,14 +129,17 @@ public void testCancel() {

@Test
public void testDelete() {
// create a task
int taskId = this.rebuild();

waitTaskSuccess(taskId);
// delete task
Response r = client().delete(path, String.valueOf(taskId));
assertResponseStatus(204, r);
}

private int rebuild() {
// create a rebuild_index task
String rebuildPath = "/graphs/hugegraph/jobs/rebuild/indexlabels";
String personByCity = "personByCity";
Map<String, Object> params = ImmutableMap.of();
Expand Down

0 comments on commit 5bb860f

Please sign in to comment.