Skip to content

Commit 0ee0dd6

Browse files
authored
[fix](routine load) reset Kafka progress cache when routine load job topic change (#38474) (#39181)
pick (#38474) When change routine load job topic from test_topic_before to test_topic_after by ``` ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = "test_topic_after"); ``` (test_topic_before has 5 rows and test_topic_after has 1 rows) Exception happened, which cannot consume any data: ``` 2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 ``` It is necessary to reset Kafka progress cache when routine load job topic change.
1 parent 5f77f90 commit 0ee0dd6

File tree

6 files changed

+186
-12
lines changed

6 files changed

+186
-12
lines changed

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,17 @@ private void getReadableProgress(Map<Integer, String> showPartitionIdToOffset) {
118118
}
119119
}
120120

121-
// modify the partition offset of this progress.
122-
// throw exception is the specified partition does not exist in progress.
123-
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
121+
public void checkPartitions(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
124122
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
125123
if (!partitionIdToOffset.containsKey(pair.first)) {
126124
throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
127125
}
128126
}
127+
}
129128

129+
// modify the partition offset of this progress.
130+
// throw exception is the specified partition does not exist in progress.
131+
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
130132
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
131133
partitionIdToOffset.put(pair.first, pair.second);
132134
}

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -692,22 +692,32 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
692692
customKafkaProperties = dataSourceProperties.getCustomKafkaProperties();
693693
}
694694

695-
// modify partition offset first
696-
if (!kafkaPartitionOffsets.isEmpty()) {
697-
// we can only modify the partition that is being consumed
698-
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
699-
}
700-
695+
// convertCustomProperties and check partitions before reset progress to make modify operation atomic
701696
if (!customKafkaProperties.isEmpty()) {
702697
this.customProperties.putAll(customKafkaProperties);
703698
convertCustomProperties(true);
704699
}
705-
// modify broker list and topic
706-
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
707-
this.brokerList = dataSourceProperties.getBrokerList();
700+
701+
if (!kafkaPartitionOffsets.isEmpty()) {
702+
((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets);
708703
}
704+
705+
// It is necessary to reset the Kafka progress cache if topic change,
706+
// and should reset cache before modifying partition offset.
709707
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
710708
this.topic = dataSourceProperties.getTopic();
709+
this.progress = new KafkaProgress();
710+
}
711+
712+
// modify partition offset
713+
if (!kafkaPartitionOffsets.isEmpty()) {
714+
// we can only modify the partition that is being consumed
715+
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
716+
}
717+
718+
// modify broker list
719+
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
720+
this.brokerList = dataSourceProperties.getBrokerList();
711721
}
712722
}
713723
if (!jobProperties.isEmpty()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql_topic_change --
3+
1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
4+
2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
5+
3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
6+
4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
7+
5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
8+
9+
-- !sql_topic_change1 --
10+
1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
11+
2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
12+
3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
13+
4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
14+
5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
15+
6 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
16+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
2+
2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
3+
3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
4+
4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
5+
5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.kafka.clients.admin.AdminClient
19+
import org.apache.kafka.clients.producer.KafkaProducer
20+
import org.apache.kafka.clients.producer.ProducerRecord
21+
import org.apache.kafka.clients.producer.ProducerConfig
22+
23+
suite("test_routine_load_topic_change","p0") {
24+
// send data to Kafka
25+
def kafkaCsvTpoics = [
26+
"test_topic_before",
27+
"test_topic_after",
28+
]
29+
String enabled = context.config.otherConfigs.get("enableKafkaTest")
30+
String kafka_port = context.config.otherConfigs.get("kafka_port")
31+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
32+
def kafka_broker = "${externalEnvIp}:${kafka_port}"
33+
if (enabled != null && enabled.equalsIgnoreCase("true")) {
34+
// define kafka
35+
def props = new Properties()
36+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
37+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
38+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
39+
// Create kafka producer
40+
def producer = new KafkaProducer<>(props)
41+
42+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
43+
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
44+
def lines = txt.readLines()
45+
lines.each { line ->
46+
logger.info("=====${line}========")
47+
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
48+
producer.send(record)
49+
}
50+
}
51+
}
52+
53+
// test create routine load job with enclose and escape
54+
def tableName = "test_routine_load_topic_change"
55+
sql """ DROP TABLE IF EXISTS ${tableName} """
56+
sql """
57+
CREATE TABLE IF NOT EXISTS ${tableName} (
58+
`k1` int(20) NULL,
59+
`k2` string NULL,
60+
`v1` date NULL,
61+
`v2` string NULL,
62+
`v3` datetime NULL,
63+
`v4` string NULL
64+
) ENGINE=OLAP
65+
DUPLICATE KEY(`k1`)
66+
COMMENT 'OLAP'
67+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
68+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
69+
"""
70+
71+
if (enabled != null && enabled.equalsIgnoreCase("true")) {
72+
def jobName = "test_topic_change"
73+
try {
74+
sql """
75+
CREATE ROUTINE LOAD ${jobName} on ${tableName}
76+
COLUMNS TERMINATED BY ","
77+
PROPERTIES
78+
(
79+
"max_batch_interval" = "5",
80+
"max_batch_rows" = "300000",
81+
"max_batch_size" = "209715200"
82+
)
83+
FROM KAFKA
84+
(
85+
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
86+
"kafka_topic" = "${kafkaCsvTpoics[0]}",
87+
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
88+
);
89+
"""
90+
sql "sync"
91+
92+
def count = 0
93+
while (true) {
94+
def res = sql "select count(*) from ${tableName}"
95+
def state = sql "show routine load for ${jobName}"
96+
log.info("routine load state: ${state[0][8].toString()}".toString())
97+
log.info("routine load statistic: ${state[0][14].toString()}".toString())
98+
log.info("reason of state changed: ${state[0][17].toString()}".toString())
99+
if (res[0][0] > 0) {
100+
break
101+
}
102+
if (count >= 120) {
103+
log.error("routine load can not visible for long time")
104+
assertEquals(20, res[0][0])
105+
break
106+
}
107+
sleep(1000)
108+
count++
109+
}
110+
qt_sql_topic_change "select * from ${tableName} order by k1"
111+
112+
sql "pause routine load for ${jobName}"
113+
def res = sql "show routine load for ${jobName}"
114+
log.info("routine load job properties: ${res[0][11].toString()}".toString())
115+
sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\" = \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" = \"OFFSET_BEGINNING\");"
116+
sql "resume routine load for ${jobName}"
117+
count = 0
118+
while (true) {
119+
res = sql "select count(*) from ${tableName}"
120+
def state = sql "show routine load for ${jobName}"
121+
log.info("routine load state: ${state[0][8].toString()}".toString())
122+
log.info("routine load statistic: ${state[0][14].toString()}".toString())
123+
log.info("reason of state changed: ${state[0][17].toString()}".toString())
124+
if (res[0][0] > 5) {
125+
break
126+
}
127+
if (count >= 120) {
128+
log.error("routine load can not visible for long time")
129+
assertEquals(20, res[0][0])
130+
break
131+
}
132+
sleep(1000)
133+
count++
134+
}
135+
qt_sql_topic_change1 "select * from ${tableName} order by k1"
136+
} finally {
137+
sql "stop routine load for ${jobName}"
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)