Skip to content

Commit 5b23ab7

Browse files
authored
[improve](routine load) delay schedule EOF tasks to avoid too many small transactions (#39975) (#40498)
pick (#39975) We encountered a scenario where a large number of small transactions were generated, resulting in an impact on query performance: Kafka's data comes in batches of very small data every very short time, which leads to tasks being frequently scheduled and ending very quickly, resulting in a large number of small transactions. To solve this problem, we delay the scheduling of tasks that perceive EOF, which would not delay data consumption, for perceiving EOF indicates that the consumption speed is greater than the production speed.
1 parent ea7e236 commit 5b23ab7

File tree

4 files changed

+33
-7
lines changed

4 files changed

+33
-7
lines changed

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionId
5858
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(),
5959
kafkaTaskInfo.getBeId(), isMultiTable);
6060
this.partitionIdToOffset = partitionIdToOffset;
61+
this.isEof = kafkaTaskInfo.getIsEof();
6162
}
6263

6364
public List<Integer> getPartitions() {

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1247,7 +1247,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn
12471247
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) {
12481248
// step2: update job progress
12491249
updateProgress(rlTaskTxnCommitAttachment);
1250-
routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment);
1250+
routineLoadTaskInfo.handleTaskByTxnCommitAttachment(rlTaskTxnCommitAttachment);
12511251
}
12521252

12531253
if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) {

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public abstract class RoutineLoadTaskInfo {
7777
protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3;
7878
protected int timeoutBackOffCount = 0;
7979

80+
protected boolean isEof = false;
81+
8082
// this status will be set when corresponding transaction's status is changed.
8183
// so that user or other logic can know the status of the corresponding txn.
8284
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
@@ -167,6 +169,10 @@ public int getTimeoutBackOffCount() {
167169
return timeoutBackOffCount;
168170
}
169171

172+
public boolean getIsEof() {
173+
return isEof;
174+
}
175+
170176
public boolean isTimeout() {
171177
if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) {
172178
// the corresponding txn is already finished, this task can not be treated as timeout.
@@ -181,7 +187,12 @@ public boolean isTimeout() {
181187
return false;
182188
}
183189

184-
public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
190+
public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
191+
selfAdaptTimeout(rlTaskTxnCommitAttachment);
192+
judgeEof(rlTaskTxnCommitAttachment);
193+
}
194+
195+
private void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
185196
long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs();
186197
long timeoutMs = this.timeoutMs;
187198

@@ -196,6 +207,15 @@ public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment
196207
this.timeoutMs = timeoutMs;
197208
}
198209

210+
private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
211+
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
212+
if (rlTaskTxnCommitAttachment.getTotalRows() < routineLoadJob.getMaxBatchRows()
213+
&& rlTaskTxnCommitAttachment.getReceivedBytes() < routineLoadJob.getMaxBatchSizeBytes()
214+
&& rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() < this.timeoutMs) {
215+
this.isEof = true;
216+
}
217+
}
218+
199219
abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
200220

201221
// begin the txn of this task

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,15 @@ private void process() throws UserException, InterruptedException {
101101
try {
102102
// This step will be blocked when queue is empty
103103
RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.take();
104-
if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime()
105-
< routineLoadTaskInfo.getTimeoutMs()) {
106-
// try to delay scheduling this task for 'timeout', to void too many failure
107-
needScheduleTasksQueue.addLast(routineLoadTaskInfo);
108-
return;
104+
// try to delay scheduling tasks that are perceived as Eof to MaxBatchInterval
105+
// to avoid to much small transaction
106+
if (routineLoadTaskInfo.getIsEof()) {
107+
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
108+
if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime()
109+
< routineLoadJob.getMaxBatchIntervalS()) {
110+
needScheduleTasksQueue.addLast(routineLoadTaskInfo);
111+
return;
112+
}
109113
}
110114
scheduleOneTask(routineLoadTaskInfo);
111115
} catch (Exception e) {
@@ -114,6 +118,7 @@ private void process() throws UserException, InterruptedException {
114118
}
115119

116120
private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception {
121+
routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis());
117122
if (LOG.isDebugEnabled()) {
118123
LOG.debug("schedule routine load task info {} for job {}",
119124
routineLoadTaskInfo.id, routineLoadTaskInfo.getJobId());

0 commit comments

Comments
 (0)