diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 959b147d9d3..2377cc2defd 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -73,6 +74,8 @@ import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; @@ -230,7 +233,8 @@ public void init() throws Exception { field.set(brokerController, broker2Client); //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor(); - + BrokerConfig config = brokerController.getBrokerConfig(); + config.setEnableAsyncTaskCheck(true); adminBrokerProcessor = new AdminBrokerProcessor(brokerController); systemTopicSet = Sets.newHashSet( @@ -1328,6 +1332,44 @@ public void testResetMasterFlushOffset() throws RemotingCommandException { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testCheckAsyncTaskStatusByTaskId() throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader); + + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + CheckRocksdbCqWriteResult results = RemotingSerializable.decode(response.getBody(), CheckRocksdbCqWriteResult.class); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + CheckAsyncTaskStatusRequestHeader requestHeader1 = new CheckAsyncTaskStatusRequestHeader(); + requestHeader1.setTaskId(results.getTaskId()); + RemotingCommand request1 = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, requestHeader1); + HashMap extFields = new HashMap<>(); + extFields.put("taskId",results.getTaskId()); + request1.setExtFields(extFields); + RemotingCommand response1 = adminBrokerProcessor.processRequest(handlerContext, request1); + assertThat(response1.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testCheckAsyncTaskStatusByTaskName() throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader); + + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + String taskName = "checkRocksdbCqWriteProgress"; + CheckAsyncTaskStatusRequestHeader requestHeader1 = new CheckAsyncTaskStatusRequestHeader(); + requestHeader1.setTaskName(taskName); + RemotingCommand request1 = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, requestHeader1); + HashMap extFields = new HashMap<>(); + extFields.put("taskName",taskName); + request1.setExtFields(extFields); + RemotingCommand response1 = adminBrokerProcessor.processRequest(handlerContext, request1); + assertThat(response1.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + private ResetOffsetRequestHeader createRequestHeader(String topic,String group,long timestamp,boolean force,long offset,int queueId) { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java index 8239a90989b..5cf1be44bc1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java @@ -38,11 +38,11 @@ public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - if (StringUtils.isBlank(taskName)) { - throw new RemotingCommandException("taskName cannot be null or blank"); + if (StringUtils.isBlank(taskName) && StringUtils.isBlank(taskId)) { + throw new RemotingCommandException("taskName and taskId cannot be empty at the same time"); } - if (maxLimit <= 0) { - throw new RemotingCommandException("maxLimit must be greater than 0"); + if (maxLimit < 0) { + throw new RemotingCommandException("maxLimit cannot be less than 0."); } if (taskStatus != null && (taskStatus < 0 || taskStatus > 3)) { throw new RemotingCommandException("taskStatus must be between 0 and 3"); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java index e543bf766c6..efeeddc01d2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java @@ -105,7 +105,17 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { String brokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null; String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; String namesAddr = commandLine.hasOption('n') ? commandLine.getOptionValue('n').trim() : null; - int maxLimit = commandLine.hasOption('m') ? Integer.parseInt(commandLine.getOptionValue('m').trim()) : DEFAULT_MAX_TASKS; + String maxLimitStr = commandLine.hasOption('m') ? commandLine.getOptionValue('m').trim() : null; + int maxLimit = DEFAULT_MAX_TASKS; + if (maxLimitStr != null && !maxLimitStr.isEmpty()) { + try { + maxLimit = Integer.parseInt(maxLimitStr); + } catch (NumberFormatException e) { + System.out.print("Illegal maxLimit parameter value"); + return; + } + } + Integer taskStatus = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s').trim()) : null; try { @@ -146,10 +156,11 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { System.out.printf("Cluster '%s' not found or has no brokers.%n", clusterName); return; } + int finalMaxLimit = maxLimit; brokerNames.forEach(brokerName -> { BrokerData brokerData = brokerAddrTable.get(brokerName); if (brokerData != null) { - checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, taskId, brokerName, maxLimit, taskStatus); + checkAsyncTaskStatusOnBroker(brokerData.selectBrokerAddr(), taskName, taskId, brokerName, finalMaxLimit, taskStatus); } }); } else {