Skip to content

Commit ef5094a

Browse files
luwei16dataroaring
authored andcommitted
[fix](compaction) fix the longest continuous rowsets cannot be selected when missing rowsets (#38728)
### problem When version is missing, the code for selecting the longest continuous version has a bug. Only the version before the missing version will be selected. For example: the current version is version [1-1], version [2-2], version [4-4], version [5-5], version [6-6], version [7-7], and version [3-3] is missing. The current result is to return version [1-1], version [2-2] instead of version [4-4], version [5-5], version [6-6], version [7-7]
1 parent e270be8 commit ef5094a

File tree

3 files changed

+291
-12
lines changed

3 files changed

+291
-12
lines changed

be/src/olap/cumulative_compaction.cpp

+22-12
Original file line numberDiff line numberDiff line change
@@ -35,33 +35,42 @@
3535
namespace doris {
3636
using namespace ErrorCode;
3737

38-
namespace {
39-
40-
void find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets,
41-
std::vector<Version>* missing_version) {
38+
void CumulativeCompaction::find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets,
39+
std::vector<Version>* missing_version) {
4240
if (rowsets->empty()) {
4341
return;
4442
}
4543

4644
RowsetSharedPtr prev_rowset = rowsets->front();
4745
size_t i = 1;
46+
int max_start = 0;
47+
int max_length = 1;
48+
49+
int start = 0;
50+
int length = 1;
4851
for (; i < rowsets->size(); ++i) {
4952
RowsetSharedPtr rowset = (*rowsets)[i];
5053
if (rowset->start_version() != prev_rowset->end_version() + 1) {
5154
if (missing_version != nullptr) {
5255
missing_version->push_back(prev_rowset->version());
5356
missing_version->push_back(rowset->version());
5457
}
55-
break;
58+
start = i;
59+
length = 1;
60+
} else {
61+
length++;
62+
}
63+
64+
if (length > max_length) {
65+
max_start = start;
66+
max_length = length;
5667
}
68+
5769
prev_rowset = rowset;
5870
}
59-
60-
rowsets->resize(i);
71+
*rowsets = {rowsets->begin() + max_start, rowsets->begin() + max_start + max_length};
6172
}
6273

63-
} // namespace
64-
6574
CumulativeCompaction::CumulativeCompaction(StorageEngine& engine, const TabletSharedPtr& tablet)
6675
: CompactionMixin(engine, tablet,
6776
"CumulativeCompaction:" + std::to_string(tablet->tablet_id())) {}
@@ -128,10 +137,11 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
128137
std::vector<Version> missing_versions;
129138
find_longest_consecutive_version(&candidate_rowsets, &missing_versions);
130139
if (!missing_versions.empty()) {
131-
DCHECK(missing_versions.size() == 2);
140+
DCHECK(missing_versions.size() % 2 == 0);
132141
LOG(WARNING) << "There are missed versions among rowsets. "
133-
<< "prev rowset verison=" << missing_versions[0]
134-
<< ", next rowset version=" << missing_versions[1]
142+
<< "total missed version size: " << missing_versions.size() / 2
143+
<< " first missed version prev rowset verison=" << missing_versions[0]
144+
<< ", first missed version next rowset version=" << missing_versions[1]
135145
<< ", tablet=" << _tablet->tablet_id();
136146
}
137147

be/src/olap/cumulative_compaction.h

+3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class CumulativeCompaction final : public CompactionMixin {
4444

4545
Status pick_rowsets_to_compact();
4646

47+
void find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets,
48+
std::vector<Version>* missing_version);
49+
4750
Version _last_delete_version {-1, -1};
4851
};
4952

+266
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "olap/cumulative_compaction.h"
19+
20+
#include <gmock/gmock-actions.h>
21+
#include <gmock/gmock-matchers.h>
22+
#include <gtest/gtest-message.h>
23+
#include <gtest/gtest-test-part.h>
24+
#include <gtest/gtest.h>
25+
26+
#include <filesystem>
27+
#include <memory>
28+
29+
#include "common/status.h"
30+
#include "cpp/sync_point.h"
31+
#include "gtest/gtest_pred_impl.h"
32+
#include "io/fs/local_file_system.h"
33+
#include "olap/cumulative_compaction_policy.h"
34+
#include "olap/data_dir.h"
35+
#include "olap/rowset/rowset_factory.h"
36+
#include "olap/storage_engine.h"
37+
#include "olap/tablet_manager.h"
38+
#include "util/threadpool.h"
39+
40+
namespace doris {
41+
using namespace config;
42+
43+
class CumulativeCompactionTest : public testing::Test {
44+
public:
45+
virtual void SetUp() {}
46+
47+
virtual void TearDown() {}
48+
};
49+
50+
static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping,
51+
int data_size) {
52+
auto rs_meta = std::make_shared<RowsetMeta>();
53+
rs_meta->set_rowset_type(BETA_ROWSET); // important
54+
rs_meta->_rowset_meta_pb.set_start_version(version.first);
55+
rs_meta->_rowset_meta_pb.set_end_version(version.second);
56+
rs_meta->set_num_segments(num_segments);
57+
rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
58+
rs_meta->set_total_disk_size(data_size);
59+
RowsetSharedPtr rowset;
60+
Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset);
61+
if (!st.ok()) {
62+
return nullptr;
63+
}
64+
return rowset;
65+
}
66+
67+
TEST_F(CumulativeCompactionTest, TestConsecutiveVersion) {
68+
EngineOptions options;
69+
StorageEngine storage_engine(options);
70+
//TabletSharedPtr tablet;
71+
72+
TabletMetaSharedPtr tablet_meta;
73+
tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
74+
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
75+
TCompressionType::LZ4F));
76+
TabletSharedPtr tablet(
77+
new Tablet(storage_engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
78+
79+
CumulativeCompaction cumu_compaction(storage_engine, tablet);
80+
81+
{
82+
std::vector<RowsetSharedPtr> rowsets;
83+
for (int i = 2; i < 10; ++i) {
84+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
85+
rowsets.push_back(rs);
86+
}
87+
std::vector<Version> missing_version;
88+
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
89+
EXPECT_EQ(rowsets.size(), 8);
90+
EXPECT_EQ(rowsets.front()->start_version(), 2);
91+
EXPECT_EQ(rowsets.front()->end_version(), 2);
92+
93+
EXPECT_EQ(rowsets.back()->start_version(), 9);
94+
EXPECT_EQ(rowsets.back()->end_version(), 9);
95+
96+
EXPECT_EQ(missing_version.size(), 0);
97+
}
98+
99+
{
100+
std::vector<RowsetSharedPtr> rowsets;
101+
for (int i = 2; i <= 4; ++i) {
102+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
103+
rowsets.push_back(rs);
104+
}
105+
106+
for (int i = 6; i <= 10; ++i) {
107+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
108+
rowsets.push_back(rs);
109+
}
110+
111+
for (int i = 12; i <= 13; ++i) {
112+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
113+
rowsets.push_back(rs);
114+
}
115+
116+
std::vector<Version> missing_version;
117+
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
118+
119+
EXPECT_EQ(rowsets.size(), 5);
120+
EXPECT_EQ(rowsets.front()->start_version(), 6);
121+
EXPECT_EQ(rowsets.front()->end_version(), 6);
122+
EXPECT_EQ(rowsets.back()->start_version(), 10);
123+
EXPECT_EQ(rowsets.back()->end_version(), 10);
124+
125+
EXPECT_EQ(missing_version.size(), 4);
126+
EXPECT_EQ(missing_version[0].first, 4);
127+
EXPECT_EQ(missing_version[0].second, 4);
128+
EXPECT_EQ(missing_version[1].first, 6);
129+
EXPECT_EQ(missing_version[1].second, 6);
130+
EXPECT_EQ(missing_version[2].first, 10);
131+
EXPECT_EQ(missing_version[2].second, 10);
132+
EXPECT_EQ(missing_version[3].first, 12);
133+
EXPECT_EQ(missing_version[3].second, 12);
134+
}
135+
136+
{
137+
std::vector<RowsetSharedPtr> rowsets;
138+
for (int i = 2; i <= 2; ++i) {
139+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
140+
rowsets.push_back(rs);
141+
}
142+
143+
for (int i = 4; i <= 4; ++i) {
144+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
145+
rowsets.push_back(rs);
146+
}
147+
148+
std::vector<Version> missing_version;
149+
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
150+
151+
EXPECT_EQ(rowsets.size(), 1);
152+
EXPECT_EQ(rowsets.front()->start_version(), 2);
153+
EXPECT_EQ(rowsets.front()->end_version(), 2);
154+
EXPECT_EQ(rowsets.back()->start_version(), 2);
155+
EXPECT_EQ(rowsets.back()->end_version(), 2);
156+
157+
EXPECT_EQ(missing_version.size(), 2);
158+
EXPECT_EQ(missing_version[0].first, 2);
159+
EXPECT_EQ(missing_version[0].second, 2);
160+
EXPECT_EQ(missing_version[1].first, 4);
161+
EXPECT_EQ(missing_version[1].second, 4);
162+
}
163+
164+
{
165+
std::vector<RowsetSharedPtr> rowsets;
166+
RowsetSharedPtr rs = create_rowset({2, 3}, 1, false, 1024);
167+
rowsets.push_back(rs);
168+
rs = create_rowset({4, 5}, 1, false, 1024);
169+
rowsets.push_back(rs);
170+
171+
rs = create_rowset({9, 11}, 1, false, 1024);
172+
rowsets.push_back(rs);
173+
rs = create_rowset({12, 13}, 1, false, 1024);
174+
rowsets.push_back(rs);
175+
176+
std::vector<Version> missing_version;
177+
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
178+
179+
EXPECT_EQ(rowsets.size(), 2);
180+
EXPECT_EQ(rowsets.front()->start_version(), 2);
181+
EXPECT_EQ(rowsets.front()->end_version(), 3);
182+
EXPECT_EQ(rowsets.back()->start_version(), 4);
183+
EXPECT_EQ(rowsets.back()->end_version(), 5);
184+
185+
EXPECT_EQ(missing_version.size(), 2);
186+
EXPECT_EQ(missing_version[0].first, 4);
187+
EXPECT_EQ(missing_version[0].second, 5);
188+
EXPECT_EQ(missing_version[1].first, 9);
189+
EXPECT_EQ(missing_version[1].second, 11);
190+
}
191+
192+
{
193+
std::vector<RowsetSharedPtr> rowsets;
194+
for (int i = 2; i <= 2; ++i) {
195+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
196+
rowsets.push_back(rs);
197+
}
198+
199+
std::vector<Version> missing_version;
200+
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
201+
EXPECT_EQ(rowsets.size(), 1);
202+
EXPECT_EQ(rowsets.front()->start_version(), 2);
203+
EXPECT_EQ(rowsets.front()->end_version(), 2);
204+
205+
EXPECT_EQ(rowsets.back()->start_version(), 2);
206+
EXPECT_EQ(rowsets.back()->end_version(), 2);
207+
EXPECT_EQ(missing_version.size(), 0);
208+
}
209+
210+
{
211+
std::vector<RowsetSharedPtr> rowsets;
212+
for (int i = 2; i <= 2; ++i) {
213+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
214+
rowsets.push_back(rs);
215+
}
216+
217+
std::vector<Version> missing_version;
218+
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
219+
EXPECT_EQ(rowsets.size(), 1);
220+
EXPECT_EQ(rowsets.front()->start_version(), 2);
221+
EXPECT_EQ(rowsets.front()->end_version(), 2);
222+
223+
EXPECT_EQ(rowsets.back()->start_version(), 2);
224+
EXPECT_EQ(rowsets.back()->end_version(), 2);
225+
EXPECT_EQ(missing_version.size(), 0);
226+
}
227+
228+
{
229+
std::vector<RowsetSharedPtr> rowsets;
230+
for (int i = 2; i <= 4; ++i) {
231+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
232+
rowsets.push_back(rs);
233+
}
234+
235+
for (int i = 6; i <= 10; ++i) {
236+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
237+
rowsets.push_back(rs);
238+
}
239+
240+
for (int i = 12; i <= 20; ++i) {
241+
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
242+
rowsets.push_back(rs);
243+
}
244+
245+
std::vector<Version> missing_version;
246+
cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version);
247+
248+
EXPECT_EQ(rowsets.size(), 9);
249+
EXPECT_EQ(rowsets.front()->start_version(), 12);
250+
EXPECT_EQ(rowsets.front()->end_version(), 12);
251+
EXPECT_EQ(rowsets.back()->start_version(), 20);
252+
EXPECT_EQ(rowsets.back()->end_version(), 20);
253+
254+
EXPECT_EQ(missing_version.size(), 4);
255+
EXPECT_EQ(missing_version[0].first, 4);
256+
EXPECT_EQ(missing_version[0].second, 4);
257+
EXPECT_EQ(missing_version[1].first, 6);
258+
EXPECT_EQ(missing_version[1].second, 6);
259+
EXPECT_EQ(missing_version[2].first, 10);
260+
EXPECT_EQ(missing_version[2].second, 10);
261+
EXPECT_EQ(missing_version[3].first, 12);
262+
EXPECT_EQ(missing_version[3].second, 12);
263+
}
264+
}
265+
266+
} // namespace doris

0 commit comments

Comments
 (0)