Skip to content

Commit f1c8426

Browse files
authored
[Bug](predicate) fix wrong result of AcceptNullPredicate (#39497)
## Proposed changes fix wrong result of AcceptNullPredicate
1 parent eb5cd76 commit f1c8426

File tree

4 files changed

+140
-22
lines changed

4 files changed

+140
-22
lines changed

be/src/olap/accept_null_predicate.h

+15-20
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ namespace doris {
3434
* but pass (set/return true) for NULL value rows.
3535
*
3636
* At parent, it's used for topn runtime predicate.
37+
* Eg: original input indexs is '1,2,3,7,8,9' and value of index9 is null, we get nested predicate output index is '1,2,3', but we finally output '1,2,3,9'
3738
*/
3839
class AcceptNullPredicate : public ColumnPredicate {
3940
ENABLE_FACTORY_CREATOR(AcceptNullPredicate);
@@ -44,8 +45,6 @@ class AcceptNullPredicate : public ColumnPredicate {
4445

4546
PredicateType type() const override { return _nested->type(); }
4647

47-
void set_nested(ColumnPredicate* nested) { _nested.reset(nested); }
48-
4948
Status evaluate(BitmapIndexIterator* iterator, uint32_t num_rows,
5049
roaring::Roaring* roaring) const override {
5150
return _nested->evaluate(iterator, num_rows, roaring);
@@ -64,11 +63,14 @@ class AcceptNullPredicate : public ColumnPredicate {
6463
void evaluate_and(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
6564
bool* flags) const override {
6665
if (column.has_null()) {
66+
std::vector<uint8_t> original_flags(size);
67+
memcpy(original_flags.data(), flags, size);
68+
6769
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
6870
_nested->evaluate_and(nullable_col.get_nested_column(), sel, size, flags);
6971
const auto& nullmap = nullable_col.get_null_map_data();
7072
for (uint16_t i = 0; i < size; ++i) {
71-
flags[i] |= nullmap[sel[i]];
73+
flags[i] |= (original_flags[i] && nullmap[sel[i]]);
7274
}
7375
} else {
7476
_nested->evaluate_and(column, sel, size, flags);
@@ -77,20 +79,7 @@ class AcceptNullPredicate : public ColumnPredicate {
7779

7880
void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
7981
bool* flags) const override {
80-
if (column.has_null()) {
81-
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
82-
_nested->evaluate_or(nullable_col.get_nested_column(), sel, size, flags);
83-
84-
// call evaluate_or and set true for NULL rows
85-
for (uint16_t i = 0; i < size; ++i) {
86-
uint16_t idx = sel[i];
87-
if (!flags[i] && nullable_col.is_null_at(idx)) {
88-
flags[i] = true;
89-
}
90-
}
91-
} else {
92-
_nested->evaluate_or(column, sel, size, flags);
93-
}
82+
DCHECK(false) << "should not reach here";
9483
}
9584

9685
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
@@ -158,21 +147,27 @@ class AcceptNullPredicate : public ColumnPredicate {
158147
}
159148
// create selected_flags
160149
uint16_t max_idx = sel[size - 1];
150+
std::vector<uint16_t> old_sel(size);
151+
memcpy(old_sel.data(), sel, sizeof(uint16_t) * size);
161152

162153
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
163154
// call nested predicate evaluate
164155
uint16_t new_size = _nested->evaluate(nullable_col.get_nested_column(), sel, size);
165156

166157
// process NULL values
167158
if (new_size < size) {
168-
std::vector<uint8_t> selected(max_idx + 1);
169-
memcpy(selected.data(), nullable_col.get_null_map_data().data(),
170-
(max_idx + 1) * sizeof(bool));
159+
std::vector<uint8_t> selected(max_idx + 1, 0);
160+
const auto* nullmap = nullable_col.get_null_map_data().data();
171161
// add rows selected by _nested->evaluate
172162
for (uint16_t i = 0; i < new_size; ++i) {
173163
uint16_t row_idx = sel[i];
174164
selected[row_idx] = true;
175165
}
166+
// reset null from original data
167+
for (uint16_t i = 0; i < size; ++i) {
168+
uint16_t row_idx = old_sel[i];
169+
selected[row_idx] |= nullmap[row_idx];
170+
}
176171

177172
// recaculate new_size and sel array
178173
new_size = 0;

be/src/olap/shared_predicate.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ class SharedPredicate : public ColumnPredicate {
167167
std::string _debug_string() const override {
168168
std::shared_lock<std::shared_mutex> lock(_mtx);
169169
if (!_nested) {
170-
return "shared_predicate<unknow>";
170+
return "shared_predicate(unknow)";
171171
}
172-
return "shared_predicate<" + _nested->debug_string() + ">";
172+
return "shared_predicate(" + _nested->debug_string() + ")";
173173
}
174174

175175
mutable std::shared_mutex _mtx;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !test --
3+
100 dd 100 0
4+
1000 dd 1000 0
5+
10000 dd 10000 0
6+
10001 dd 10001 0
7+
10002 dd 10002 0
8+
10003 dd 10003 0
9+
10004 dd 10004 0
10+
10005 dd 10005 0
11+
10006 dd 10006 0
12+
10007 dd 10007 0
13+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.codehaus.groovy.runtime.IOGroovyMethods
19+
20+
suite ("accept_null") {
21+
sql """ drop table IF EXISTS detail_tmp;"""
22+
23+
sql """
24+
CREATE TABLE `detail_tmp` (
25+
`id` VARCHAR(512) NOT NULL,
26+
`accident_no` VARCHAR(512) NULL,
27+
`accident_type_name` VARCHAR(512) NULL
28+
) ENGINE=OLAP
29+
UNIQUE KEY(`id`)
30+
DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
31+
PROPERTIES (
32+
"replication_allocation" = "tag.location.default: 1",
33+
"min_load_replica_num" = "-1",
34+
"is_being_synced" = "false",
35+
"storage_medium" = "hdd",
36+
"storage_format" = "V2",
37+
"inverted_index_storage_format" = "V1",
38+
"enable_unique_key_merge_on_write" = "true",
39+
"light_schema_change" = "true",
40+
"disable_auto_compaction" = "false",
41+
"enable_single_replica_compaction" = "false",
42+
"group_commit_interval_ms" = "10000",
43+
"group_commit_data_bytes" = "134217728",
44+
"enable_mow_light_delete" = "false"
45+
);
46+
"""
47+
48+
sql "insert into detail_tmp(id,accident_type_name,accident_no) select e1,'dd',e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1;"
49+
sql "delete from detail_tmp where accident_no <100;"
50+
51+
def tablets = sql_return_maparray """ show tablets from detail_tmp; """
52+
53+
// before full compaction, there are 7 rowsets in all tablets.
54+
for (def tablet : tablets) {
55+
int rowsetCount = 0
56+
def (code, out, err) = curl("GET", tablet.CompactionStatus)
57+
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
58+
assertEquals(code, 0)
59+
def tabletJson = parseJson(out.trim())
60+
assert tabletJson.rowsets instanceof List
61+
}
62+
63+
// trigger full compactions for all tablets by table id in ${tableName}
64+
def backendId_to_backendIP = [:]
65+
def backendId_to_backendHttpPort = [:]
66+
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
67+
boolean disableAutoCompaction = true
68+
for(int i=0;i<backendId_to_backendIP.keySet().size();i++){
69+
backend_id = backendId_to_backendIP.keySet()[i]
70+
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
71+
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
72+
assertEquals(code, 0)
73+
def configList = parseJson(out.trim())
74+
assert configList instanceof List
75+
76+
for (Object ele in (List) configList) {
77+
assert ele instanceof List<String>
78+
if (((List<String>) ele)[0] == "disable_auto_compaction") {
79+
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
80+
}
81+
}
82+
}
83+
84+
for (def tablet : tablets) {
85+
String tablet_id = tablet.TabletId
86+
def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """
87+
logger.info("tablet"+tablet_info)
88+
def table_id = tablet_info[0].TableId
89+
backend_id = tablet.BackendId
90+
def times = 1
91+
def code, out, err
92+
do{
93+
(code, out, err) = be_run_full_compaction_by_table_id(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_id)
94+
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
95+
++times
96+
sleep(2000)
97+
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
98+
99+
def compactJson = parseJson(out.trim())
100+
if (compactJson.status.toLowerCase() == "fail") {
101+
assertEquals(disableAutoCompaction, false)
102+
logger.info("Compaction was done automatically!")
103+
}
104+
if (disableAutoCompaction) {
105+
assertEquals("success", compactJson.status.toLowerCase())
106+
}
107+
}
108+
109+
qt_test "select id,accident_type_name,accident_no,__DORIS_DELETE_SIGN__ From detail_tmp where accident_type_name = 'dd' order by accident_no,id limit 10;"
110+
}

0 commit comments

Comments
 (0)