Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.1] Picks "[Fix](delete) Fix delete job timeout when executing delete from ... #37363" #37374

Merged
merged 1 commit into from
Jul 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "olap/predicate_creator.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/debug_points.h"

using apache::thrift::ThriftDebugString;
using std::vector;
Expand Down Expand Up @@ -90,6 +91,10 @@ std::string trans_op(const std::string& opt) {
Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
const std::vector<TCondition>& conditions,
DeletePredicatePB* del_pred) {
DBUG_EXECUTE_IF("DeleteHandler::generate_delete_predicate.inject_failure", {
return Status::Error<false>(dp->param<int>("error_code"),
dp->param<std::string>("error_msg"));
})
if (conditions.empty()) {
return Status::Error<DELETE_INVALID_PARAMETERS>(
"invalid parameters for store_cond. condition_size={}", conditions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ public void await() throws Exception {
long timeoutMs = getTimeoutMs();
boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (ok) {
if (!countDownLatch.getStatus().ok()) {
// encounter some errors that don't need to retry, abort directly
LOG.warn("delete job failed, errmsg={}", countDownLatch.getStatus().getErrorMsg());
throw new UserException(String.format("delete job failed, errmsg:%s",
countDownLatch.getStatus().getErrorMsg()));
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
&& taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE
&& taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION
&& taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO
&& taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
&& taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
&& taskType != TTaskType.REALTIME_PUSH) {
return result;
}
}
Expand All @@ -150,7 +151,6 @@ public TMasterResult finishTask(TFinishTaskRequest request) {
finishCreateReplica(task, request);
break;
case REALTIME_PUSH:
checkHasTabletInfo(request);
Preconditions.checkState(request.isSetReportVersion());
finishRealtimePush(task, request);
break;
Expand Down Expand Up @@ -295,16 +295,32 @@ private void finishUpdateTabletMeta(AgentTask task, TFinishTaskRequest request)
}
}

private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) {
List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
Preconditions.checkState(finishTabletInfos != null && !finishTabletInfos.isEmpty());

private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) throws Exception {
PushTask pushTask = (PushTask) task;

long dbId = pushTask.getDbId();
long backendId = pushTask.getBackendId();
long signature = task.getSignature();
long transactionId = ((PushTask) task).getTransactionId();

if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
if (pushTask.getPushType() == TPushType.DELETE) {
// DeleteHandler may return status code DELETE_INVALID_CONDITION and DELETE_INVALID_PARAMETERS,
// we don't need to retry if meet them.
// note that they will be converted to TStatusCode.INTERNAL_ERROR when being sent from be to fe
if (request.getTaskStatus().getStatusCode() == TStatusCode.INTERNAL_ERROR) {
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
}
}
return;
}

checkHasTabletInfo(request);
List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();

Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Status;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TCondition;
Expand All @@ -34,6 +35,7 @@
import org.apache.doris.thrift.TPushReq;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -211,6 +213,16 @@ public void countDownLatch(long backendId, long tabletId) {
}
}

// call this always means one of tasks is failed. count down to zero to finish entire task
public void countDownToZero(TStatusCode code, String errMsg) {
if (this.latch != null) {
latch.countDownToZero(new Status(code, errMsg));
if (LOG.isDebugEnabled()) {
LOG.debug("PushTask count down to zero. error msg: {}", errMsg);
}
}
}

public long getReplicaId() {
return replicaId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_delete_from_timeout","nonConcurrent") {

def tableName = "test_delete_from_timeout"

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE ${tableName} (
`col1` BOOLEAN NOT NULL,
`col2` DECIMAL(17, 1) NOT NULL,
`col3` INT NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`col1`, `col2`, `col3`)
DISTRIBUTED BY HASH(`col1`, `col2`, `col3`) BUCKETS 4
PROPERTIES (
"replication_allocation" = "tag.location.default: 1")
"""

GetDebugPoint().clearDebugPointsForAllBEs()

try {
sql "insert into ${tableName} values(1, 99.9, 234);"
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -1900 /* DELETE_INVALID_CONDITION */, error_msg: "data type is float or double."])
test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "data type is float or double."
}

GetDebugPoint().clearDebugPointsForAllBEs()

GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -1903 /* DELETE_INVALID_PARAMETERS */, error_msg: "invalid parameters for store_cond. condition_size=1"])
test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "invalid parameters for store_cond. condition_size=1"
}
} catch (Exception e) {
logger.info(e.getMessage())
AssertTrue(false)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure")
}
}
Loading