Skip to content

Commit

Permalink
[ISSUE #9097]Add new command to check async task status in broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
KiteSoar committed Feb 7, 2025
1 parent de4e48d commit bf4a370
Show file tree
Hide file tree
Showing 13 changed files with 1,054 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.AsyncTask;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.TaskStatus;

public class AdminAsyncTaskManager {

// taskId -> AsyncTask
private final Cache<String, AsyncTask> asyncTaskCache;

// taskName -> taskId
private final ConcurrentHashMap<String, List<String>> taskNameToIdsMap;

public AdminAsyncTaskManager() {
this.asyncTaskCache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.MINUTES)
.maximumSize(10000)
.build();

this.taskNameToIdsMap = new ConcurrentHashMap<>();
}

/**
* Creates a new asynchronous task with a unique taskId.
*
* @param taskName The name of the task.
* @param future The CompletableFuture representing the asynchronous task.
* @return The generated taskId.
*/
public String createTask(String taskName, CompletableFuture<?> future) {
String taskId = UUID.randomUUID().toString();
AsyncTask task = new AsyncTask(taskName, taskId, future);

asyncTaskCache.put(taskId, task);
taskNameToIdsMap.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId);

future.whenComplete((result, throwable) -> {
if (throwable != null) {
task.setStatus(TaskStatus.ERROR.getValue());
task.setResult(throwable.getMessage());
} else {
task.setStatus(TaskStatus.SUCCESS.getValue());
task.setResult(JSON.toJSONString(result));
}
});

return taskId;
}

/**
* Get all taskIds associated with a given task name.
*
* @param taskName The name of the task.
* @return List of taskIds for the given task name.
*/
public List<String> getTaskIdsByName(String taskName) {
return taskNameToIdsMap.getOrDefault(taskName, Collections.emptyList());
}

/**
* Get the status of a specific task.
*
* @param taskId The unique identifier of the task.
* @return The AsyncTask object, or null if not found.
*/
public AsyncTask getTaskStatus(String taskId) {
return asyncTaskCache.getIfPresent(taskId);
}

/**
* Update the status and result of a specific task.
*
* @param taskId The unique identifier of the task.
* @param status The new status of the task.
* @param result The result of the task.
*/
public void updateTaskStatus(String taskId, int status, String result) {
AsyncTask task = asyncTaskCache.getIfPresent(taskId);
if (task != null) {
task.setStatus(status);
task.setResult(result);
asyncTaskCache.put(taskId, task);
}
}

/**
* Remove a specific task from the cache and mappings.
*
* @param taskId The unique identifier of the task.
*/
public void removeTask(String taskId) {
AsyncTask task = asyncTaskCache.getIfPresent(taskId);
if (task != null) {
asyncTaskCache.invalidate(taskId);
taskNameToIdsMap.computeIfPresent(task.getTaskName(), (k, v) -> {
v.remove(taskId);
return v.isEmpty() ? null : v;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
Expand All @@ -56,6 +57,7 @@
import org.apache.rocketmq.auth.authorization.exception.AuthorizationException;
import org.apache.rocketmq.auth.authorization.model.Acl;
import org.apache.rocketmq.auth.authorization.model.Resource;
import org.apache.rocketmq.broker.AdminAsyncTaskManager;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.auth.converter.AclConverter;
import org.apache.rocketmq.broker.auth.converter.UserConverter;
Expand All @@ -72,6 +74,7 @@
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AsyncTask;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
Expand Down Expand Up @@ -149,6 +152,8 @@
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
Expand Down Expand Up @@ -245,9 +250,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
protected final BrokerController brokerController;
protected Set<String> configBlackList = new HashSet<>();
private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
private final AdminAsyncTaskManager asyncTaskManager;

public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.asyncTaskManager = new AdminAsyncTaskManager();
initConfigBlackList();
}

Expand Down Expand Up @@ -415,6 +422,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.listAcl(ctx, request);
case RequestCode.POP_ROLLBACK:
return this.transferPopToFsStore(ctx, request);
case RequestCode.CHECK_ASYNC_TASK_STATUS:
return this.checkAsyncTaskStatus(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
Expand Down Expand Up @@ -487,15 +496,16 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) {
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
Runnable runnable = () -> {

CompletableFuture<CheckRocksdbCqWriteResult> future = CompletableFuture.supplyAsync(() -> {
try {
CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request);
LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult));
return doCheckRocksdbCqWriteProgress(ctx, request);
} catch (Exception e) {
LOGGER.error("checkRocksdbCqWriteProgress error", e);
throw new CompletionException(e);
}
};
asyncExecuteWorker.submit(runnable);
}, asyncExecuteWorker);

asyncTaskManager.createTask("checkRocksdbCqWriteProgress", future);
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setBody(JSON.toJSONBytes(result));
Expand Down Expand Up @@ -3597,4 +3607,31 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting
}
return response;
}

private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class);
List<String> taskIds = asyncTaskManager.getTaskIdsByName(requestHeader.getTaskName());
if (CollectionUtils.isEmpty(taskIds)) {
throw new RemotingCommandException("taskName: " + requestHeader.getTaskName() + " not found");
}

try {
int maxResults = Math.min(requestHeader.getMaxLimit(), 200);
Integer filterStatus = requestHeader.getTaskStatus();

List<AsyncTask> asyncTasks = taskIds.stream()
.map(asyncTaskManager::getTaskStatus)
.filter(task -> filterStatus == null || task.getStatus() == filterStatus)
.limit(maxResults)
.collect(Collectors.toList());

RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setBody(JSON.toJSONBytes(asyncTasks));
return response;
} catch (Exception e) {
LOGGER.error("checkAsyncTaskStatus error", e);
return RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.rpchook.NamespaceRpcHook;
import org.apache.rocketmq.common.AsyncTask;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.MQVersion;
Expand Down Expand Up @@ -149,6 +150,7 @@
import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
Expand Down Expand Up @@ -3601,4 +3603,17 @@ public void exportPopRecord(String brokerAddr, long timeout) throws RemotingConn
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public List<AsyncTask> checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader,
long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS,
requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
assert response != null;
if (response.getCode() == SUCCESS) {
return RemotingSerializable.decodeList(response.getBody(), AsyncTask.class);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
98 changes: 98 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/AsyncTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common;

import java.util.Date;
import java.util.concurrent.CompletableFuture;

public class AsyncTask {

private String taskName;

private String taskId;

private int status;

private Date createTime;

private String result;

private final CompletableFuture<?> future;

public AsyncTask(String taskName, String taskId, CompletableFuture<?> future) {
this.taskName = taskName;
this.taskId = taskId;
this.status = TaskStatus.INIT.getValue();
this.createTime = new Date();
this.result = null;
this.future = future;
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskName) {
this.taskName = taskName;
}

public int getStatus() {
return status;
}

public void setStatus(int status) {
this.status = status;
}

public String getResult() {
return result;
}

public void setResult(String result) {
this.result = result;
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public String getTaskId() {
return taskId;
}

public void setTaskId(String taskId) {
this.taskId = taskId;
}

public CompletableFuture<?> getFuture() {
return future;
}

public static String getDescFromStatus(int status) {
for (TaskStatus taskStatus : TaskStatus.values()) {
if (taskStatus.getValue() == status) {
return taskStatus.getDesc();
}
}
return "unknown";
}
}
Loading

0 comments on commit bf4a370

Please sign in to comment.