Skip to content

Commit

Permalink
[ISSUE#9097] Add UT.
Browse files Browse the repository at this point in the history
  • Loading branch information
KiteSoar committed Feb 23, 2025
1 parent 690acb0 commit ce88393
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -73,6 +74,8 @@
import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
Expand Down Expand Up @@ -230,7 +233,8 @@ public void init() throws Exception {
field.set(brokerController, broker2Client);

//doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();

BrokerConfig config = brokerController.getBrokerConfig();
config.setEnableAsyncTaskCheck(true);
adminBrokerProcessor = new AdminBrokerProcessor(brokerController);

systemTopicSet = Sets.newHashSet(
Expand Down Expand Up @@ -1328,6 +1332,44 @@ public void testResetMasterFlushOffset() throws RemotingCommandException {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testCheckAsyncTaskStatusByTaskId() throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader);

RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
CheckRocksdbCqWriteResult results = RemotingSerializable.decode(response.getBody(), CheckRocksdbCqWriteResult.class);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

CheckAsyncTaskStatusRequestHeader requestHeader1 = new CheckAsyncTaskStatusRequestHeader();
requestHeader1.setTaskId(results.getTaskId());
RemotingCommand request1 = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, requestHeader1);
HashMap<String, String> extFields = new HashMap<>();
extFields.put("taskId",results.getTaskId());
request1.setExtFields(extFields);
RemotingCommand response1 = adminBrokerProcessor.processRequest(handlerContext, request1);
assertThat(response1.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testCheckAsyncTaskStatusByTaskName() throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader);

RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

String taskName = "checkRocksdbCqWriteProgress";
CheckAsyncTaskStatusRequestHeader requestHeader1 = new CheckAsyncTaskStatusRequestHeader();
requestHeader1.setTaskName(taskName);
RemotingCommand request1 = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, requestHeader1);
HashMap<String, String> extFields = new HashMap<>();
extFields.put("taskName",taskName);
request1.setExtFields(extFields);
RemotingCommand response1 = adminBrokerProcessor.processRequest(handlerContext, request1);
assertThat(response1.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

private ResetOffsetRequestHeader createRequestHeader(String topic,String group,long timestamp,boolean force,long offset,int queueId) {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader {

@Override
public void checkFields() throws RemotingCommandException {
if (StringUtils.isBlank(taskName)) {
throw new RemotingCommandException("taskName cannot be null or blank");
if (StringUtils.isBlank(taskName) && StringUtils.isBlank(taskId)) {
throw new RemotingCommandException("taskName and taskId cannot be empty at the same time");
}
if (maxLimit <= 0) {
throw new RemotingCommandException("maxLimit must be greater than 0");
if (maxLimit < 0) {
throw new RemotingCommandException("maxLimit cannot be less than 0.");
}
if (taskStatus != null && (taskStatus < 0 || taskStatus > 3)) {
throw new RemotingCommandException("taskStatus must be between 0 and 3");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,17 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null;
String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null;
String namesAddr = commandLine.hasOption('n') ? commandLine.getOptionValue('n').trim() : null;
int maxLimit = commandLine.hasOption('m') ? Integer.parseInt(commandLine.getOptionValue('m').trim()) : DEFAULT_MAX_TASKS;
String maxLimitStr = commandLine.hasOption('m') ? commandLine.getOptionValue('m').trim() : null;
int maxLimit = DEFAULT_MAX_TASKS;
if (maxLimitStr != null && !maxLimitStr.isEmpty()) {
try {
maxLimit = Integer.parseInt(maxLimitStr);
} catch (NumberFormatException e) {
System.out.print("Illegal maxLimit parameter value");
return;
}
}

Integer taskStatus = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s').trim()) : null;

try {
Expand Down Expand Up @@ -146,10 +156,11 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
System.out.printf("Cluster '%s' not found or has no brokers.%n", clusterName);
return;
}
int finalMaxLimit = maxLimit;
brokerNames.forEach(brokerName -> {
BrokerData brokerData = brokerAddrTable.get(brokerName);
if (brokerData != null) {
checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, taskId, brokerName, maxLimit, taskStatus);
checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, taskId, brokerName, finalMaxLimit, taskStatus);
}
});
} else {
Expand Down

0 comments on commit ce88393

Please sign in to comment.