From 79c48aa64a6934ed17eecabc77bbc8e92a5573a5 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 5 Jul 2024 22:21:11 +0800 Subject: [PATCH] fix --- be/src/olap/delete_handler.cpp | 5 ++ .../java/org/apache/doris/load/DeleteJob.java | 6 ++ .../org/apache/doris/master/MasterImpl.java | 28 +++++++-- .../java/org/apache/doris/task/PushTask.java | 12 ++++ .../test_delete_from_timeout.groovy | 59 +++++++++++++++++++ 5 files changed, 104 insertions(+), 6 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp index 6e390874126d36..8d85eb84bab7ff 100644 --- a/be/src/olap/delete_handler.cpp +++ b/be/src/olap/delete_handler.cpp @@ -35,6 +35,7 @@ #include "olap/predicate_creator.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "util/debug_points.h" using apache::thrift::ThriftDebugString; using std::vector; @@ -90,6 +91,10 @@ std::string trans_op(const std::string& opt) { Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema, const std::vector& conditions, DeletePredicatePB* del_pred) { + DBUG_EXECUTE_IF("DeleteHandler::generate_delete_predicate.inject_failure", { + return Status::Error(dp->param("error_code"), + dp->param("error_msg")); + }) if (conditions.empty()) { return Status::Error( "invalid parameters for store_cond. condition_size={}", conditions.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index f9c94284db89df..761db0f4725dbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -373,6 +373,12 @@ public void await() throws Exception { long timeoutMs = getTimeoutMs(); boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); if (ok) { + if (!countDownLatch.getStatus().ok()) { + // encounter some errors that don't need to retry, abort directly + LOG.warn("delete job failed, errmsg={}", countDownLatch.getStatus().getErrorMsg()); + throw new UserException(String.format("delete job failed, errmsg:%s", + countDownLatch.getStatus().getErrorMsg())); + } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 485463d8daf157..12d908ff317d84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -137,7 +137,8 @@ public TMasterResult finishTask(TFinishTaskRequest request) { && taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE && taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION && taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO - && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) { + && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE + && taskType != TTaskType.REALTIME_PUSH) { return result; } } @@ -150,7 +151,6 @@ public TMasterResult finishTask(TFinishTaskRequest request) { finishCreateReplica(task, request); break; case REALTIME_PUSH: - checkHasTabletInfo(request); Preconditions.checkState(request.isSetReportVersion()); finishRealtimePush(task, request); break; @@ -295,16 +295,32 @@ private void finishUpdateTabletMeta(AgentTask task, TFinishTaskRequest request) } } - private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) { - List finishTabletInfos = request.getFinishTabletInfos(); - Preconditions.checkState(finishTabletInfos != null && !finishTabletInfos.isEmpty()); - + private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) throws Exception { PushTask pushTask = (PushTask) task; long dbId = pushTask.getDbId(); long backendId = pushTask.getBackendId(); long signature = task.getSignature(); long transactionId = ((PushTask) task).getTransactionId(); + + if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) { + if (pushTask.getPushType() == TPushType.DELETE) { + // DeleteHandler may return status code DELETE_INVALID_CONDITION and DELETE_INVALID_PARAMETERS, + // we don't need to retry if meet them. + // note that they will be converted to TStatusCode.INTERNAL_ERROR when being sent from be to fe + if (request.getTaskStatus().getStatusCode() == TStatusCode.INTERNAL_ERROR) { + pushTask.countDownToZero(request.getTaskStatus().getStatusCode(), + task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString()); + AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); + LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString()); + } + } + return; + } + + checkHasTabletInfo(request); + List finishTabletInfos = request.getFinishTabletInfos(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java index ca29cc78e6f4d8..df361593b49ece 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.Predicate; import org.apache.doris.analysis.SlotRef; import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Status; import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TCondition; @@ -34,6 +35,7 @@ import org.apache.doris.thrift.TPushReq; import org.apache.doris.thrift.TPushType; import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTaskType; import com.google.common.collect.Maps; @@ -211,6 +213,16 @@ public void countDownLatch(long backendId, long tabletId) { } } + // call this always means one of tasks is failed. count down to zero to finish entire task + public void countDownToZero(TStatusCode code, String errMsg) { + if (this.latch != null) { + latch.countDownToZero(new Status(code, errMsg)); + if (LOG.isDebugEnabled()) { + LOG.debug("PushTask count down to zero. error msg: {}", errMsg); + } + } + } + public long getReplicaId() { return replicaId; } diff --git a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy new file mode 100644 index 00000000000000..2d5bf41b3db34f --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_delete_from_timeout","nonConcurrent") { + + def tableName = "test_delete_from_timeout" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `col1` BOOLEAN NOT NULL, + `col2` DECIMAL(17, 1) NOT NULL, + `col3` INT NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`col1`, `col2`, `col3`) + DISTRIBUTED BY HASH(`col1`, `col2`, `col3`) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + + try { + sql "insert into ${tableName} values(1, 99.9, 234);" + GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure", + [error_code: -1900 /* DELETE_INVALID_CONDITION */, error_msg: "data type is float or double."]) + test { + sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """ + exception "data type is float or double." + } + + GetDebugPoint().clearDebugPointsForAllBEs() + + GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure", + [error_code: -1903 /* DELETE_INVALID_PARAMETERS */, error_msg: "invalid parameters for store_cond. condition_size=1"]) + test { + sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """ + exception "invalid parameters for store_cond. condition_size=1" + } + } catch (Exception e) { + logger.info(e.getMessage()) + AssertTrue(false) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure") + } +} \ No newline at end of file