Skip to content

Commit 5392cb1

Browse files
authored
[fix](mtmv)fix nested mtmv not refresh (#40433)
fix nested mtmv not refresh because the partition version remains unchanged after inserting overwrite for the underlying materialized view we add partitionId in snapshot
1 parent 6a19a37 commit 5392cb1

File tree

8 files changed

+208
-4
lines changed

8 files changed

+208
-4
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3147,9 +3147,10 @@ public List<Column> getPartitionColumns() {
31473147
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
31483148
throws AnalysisException {
31493149
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
3150+
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
31503151
long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName)
31513152
: getPartitionOrAnalysisException(partitionName).getVisibleVersion();
3152-
return new MTMVVersionSnapshot(visibleVersion);
3153+
return new MTMVVersionSnapshot(visibleVersion, partitionId);
31533154
}
31543155

31553156
@Override

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java

+44-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.doris.mtmv;
1919

2020
import org.apache.doris.catalog.MTMV;
21+
import org.apache.doris.catalog.OlapTable;
22+
import org.apache.doris.catalog.Partition;
23+
import org.apache.doris.common.AnalysisException;
2124

2225
import com.google.common.collect.Maps;
2326
import com.google.gson.annotations.SerializedName;
@@ -74,6 +77,46 @@ public String toString() {
7477
}
7578

7679
public void compatible(MTMV mtmv) {
80+
try {
81+
// snapshot add partitionId resolve problem of insert overwrite
82+
compatiblePartitions(mtmv);
83+
} catch (Throwable e) {
84+
LOG.warn("MTMV compatiblePartitions failed, mtmv: {}", mtmv.getName(), e);
85+
}
86+
try {
87+
// change table id to BaseTableInfo
88+
compatibleTables(mtmv);
89+
} catch (Throwable e) {
90+
LOG.warn("MTMV compatibleTables failed, mtmv: {}", mtmv.getName(), e);
91+
}
92+
}
93+
94+
private void compatiblePartitions(MTMV mtmv) throws AnalysisException {
95+
if (!checkHasDataWithoutPartitionId()) {
96+
return;
97+
}
98+
OlapTable relatedTable = (OlapTable) mtmv.getMvPartitionInfo().getRelatedTable();
99+
for (Entry<String, MTMVSnapshotIf> entry : partitions.entrySet()) {
100+
MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue();
101+
if (versionSnapshot.getId() == 0) {
102+
Partition partition = relatedTable.getPartition(entry.getKey());
103+
if (partition != null) {
104+
(versionSnapshot).setId(partition.getId());
105+
}
106+
}
107+
}
108+
}
109+
110+
private boolean checkHasDataWithoutPartitionId() {
111+
for (MTMVSnapshotIf snapshot : partitions.values()) {
112+
if (snapshot instanceof MTMVVersionSnapshot && ((MTMVVersionSnapshot) snapshot).getId() == 0) {
113+
return true;
114+
}
115+
}
116+
return false;
117+
}
118+
119+
private void compatibleTables(MTMV mtmv) {
77120
if (tables.size() == tablesInfo.size()) {
78121
return;
79122
}
@@ -87,7 +130,7 @@ public void compatible(MTMV mtmv) {
87130
if (tableInfo.isPresent()) {
88131
tablesInfo.put(tableInfo.get(), entry.getValue());
89132
} else {
90-
LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(),
133+
LOG.warn("MTMV compatibleTables failed, tableId: {}, relationTables: {}", entry.getKey(),
91134
relation.getBaseTablesOneLevel());
92135
}
93136
}

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,30 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf {
2424
@SerializedName("v")
2525
private long version;
2626

27+
// The partition version after insert overwrite is 1,
28+
// which may cause the upper level materialized view to be unaware of changes in the data at the bottom level.
29+
// However, the partition ID after overwrite will change, so the partition ID should be added.
30+
// only for partition, table will always 0
31+
@SerializedName("id")
32+
private long id;
33+
2734
public MTMVVersionSnapshot(long version) {
2835
this.version = version;
2936
}
3037

38+
public MTMVVersionSnapshot(long version, long id) {
39+
this.version = version;
40+
this.id = id;
41+
}
42+
43+
public long getId() {
44+
return id;
45+
}
46+
47+
public void setId(long id) {
48+
this.id = id;
49+
}
50+
3151
@Override
3252
public boolean equals(Object o) {
3353
if (this == o) {
@@ -37,18 +57,19 @@ public boolean equals(Object o) {
3757
return false;
3858
}
3959
MTMVVersionSnapshot that = (MTMVVersionSnapshot) o;
40-
return version == that.version;
60+
return version == that.version && id == that.id;
4161
}
4262

4363
@Override
4464
public int hashCode() {
45-
return Objects.hashCode(version);
65+
return Objects.hashCode(version, id);
4666
}
4767

4868
@Override
4969
public String toString() {
5070
return "MTMVVersionSnapshot{"
5171
+ "version=" + version
72+
+ ", id=" + id
5273
+ '}';
5374
}
5475
}

regression-test/data/mtmv_p0/test_multi_level_mtmv.out

+11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@
1111
-- !mv2_should_one_partition --
1212
["p_2"]
1313

14+
-- !mv1_should_one_partition_again --
15+
["p_2"]
16+
17+
-- !mv2_should_one_partition_again --
18+
["p_2"]
19+
20+
-- !mv2_again --
21+
1 1
22+
2 2
23+
2 3
24+
1425
-- !status1 --
1526
multi_level_mtmv1 SCHEMA_CHANGE SUCCESS
1627

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !refresh_init --
3+
1 2017-01-15 1
4+
2 2017-02-15 2
5+
3 2017-03-15 3
6+
7+
-- !mtmv_sync --
8+
true
9+

regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy

+16
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,22 @@ suite("test_multi_level_mtmv") {
8787
waitingMTMVTaskFinishedByMvName(mv2)
8888
order_qt_mv2_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"
8989

90+
// insert into p2 again, check partition version if change
91+
sql """
92+
INSERT INTO ${tableName} VALUES(2,3);
93+
"""
94+
sql """
95+
REFRESH MATERIALIZED VIEW ${mv1} AUTO
96+
"""
97+
waitingMTMVTaskFinishedByMvName(mv1)
98+
order_qt_mv1_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv1}' order by CreateTime desc limit 1"
99+
sql """
100+
REFRESH MATERIALIZED VIEW ${mv2} AUTO
101+
"""
102+
waitingMTMVTaskFinishedByMvName(mv2)
103+
order_qt_mv2_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"
104+
order_qt_mv2_again "select * from ${mv2}"
105+
90106
// drop table
91107
sql """
92108
drop table ${tableName}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_upgrade_downgrade_prepare_olap_mtmv","p0,mtmv,restart_fe") {
19+
String suiteName = "mtmv_up_down_olap"
20+
String mvName = "${suiteName}_mtmv"
21+
String tableName = "${suiteName}_table"
22+
String tableName2 = "${suiteName}_table2"
23+
24+
sql """drop materialized view if exists ${mvName};"""
25+
sql """drop table if exists `${tableName}`"""
26+
sql """drop table if exists `${tableName2}`"""
27+
28+
sql """
29+
CREATE TABLE `${tableName}` (
30+
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
31+
`date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
32+
`num` SMALLINT NOT NULL COMMENT '\"数量\"'
33+
) ENGINE=OLAP
34+
DUPLICATE KEY(`user_id`, `date`, `num`)
35+
COMMENT 'OLAP'
36+
PARTITION BY RANGE(`date`)
37+
(PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
38+
PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
39+
PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01')))
40+
DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
41+
PROPERTIES ('replication_num' = '1') ;
42+
"""
43+
sql """
44+
insert into ${tableName} values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);
45+
"""
46+
47+
sql """
48+
CREATE TABLE `${tableName2}` (
49+
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
50+
`age` SMALLINT NOT NULL COMMENT '\"年龄\"'
51+
) ENGINE=OLAP
52+
DUPLICATE KEY(`user_id`, `age`)
53+
COMMENT 'OLAP'
54+
DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
55+
PROPERTIES ('replication_num' = '1') ;
56+
"""
57+
sql """
58+
insert into ${tableName2} values(1,1),(2,2),(3,3);
59+
"""
60+
61+
sql """
62+
CREATE MATERIALIZED VIEW ${mvName}
63+
REFRESH AUTO ON MANUAL
64+
partition by(`date`)
65+
DISTRIBUTED BY RANDOM BUCKETS 2
66+
PROPERTIES ('replication_num' = '1')
67+
AS
68+
SELECT a.* FROM ${tableName} a inner join ${tableName2} b on a.user_id=b.user_id;
69+
"""
70+
waitingMTMVTaskFinishedByMvName(mvName)
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_upgrade_downgrade_olap_mtmv","p0,mtmv,restart_fe") {
19+
String suiteName = "mtmv_up_down_olap"
20+
String dbName = context.config.getDbNameByFile(context.file)
21+
String mvName = "${suiteName}_mtmv"
22+
String tableName = "${suiteName}_table"
23+
// test data is normal
24+
order_qt_refresh_init "SELECT * FROM ${mvName}"
25+
// test is sync
26+
order_qt_mtmv_sync "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'"
27+
sql """
28+
REFRESH MATERIALIZED VIEW ${mvName} complete
29+
"""
30+
// test can refresh success
31+
waitingMTMVTaskFinishedByMvName(mvName)
32+
}

0 commit comments

Comments
 (0)