Skip to content

Commit f72cc38

Browse files
bobhan1dataroaring
authored andcommitted
[Fix](partial update) Persist partial_update_info in RocksDB in case of BE restart after a partial update has commited (#38331)
## Proposed changes If a partial update has conflict with another load during publish phase, it should combine the two load's data into one to get the corrrect result. This procedure needs partial update info. But If BE crashed after the partial update load has committed, the partial update info will be missing becasuse it's not persisted and will not be restored in `DataDir::load()`. This PR persists partial update info in RocksDB before the txn is commited and remove it after the publish phase. Before #25147, partial update info is persisted with tablet_schema in RocksDB. #25147 split partial update info from tablet schema but forget to handle the persistence logic.
1 parent 22c0dae commit f72cc38

13 files changed

+582
-77
lines changed

be/src/olap/partial_update_info.cpp

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "olap/partial_update_info.h"
19+
20+
#include <gen_cpp/olap_file.pb.h>
21+
22+
#include "olap/tablet_schema.h"
23+
24+
namespace doris {
25+
26+
void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
27+
const std::set<string>& partial_update_cols, bool is_strict_mode,
28+
int64_t timestamp_ms, const std::string& timezone,
29+
const std::string& auto_increment_column, int64_t cur_max_version) {
30+
is_partial_update = partial_update;
31+
partial_update_input_columns = partial_update_cols;
32+
max_version_in_flush_phase = cur_max_version;
33+
this->timestamp_ms = timestamp_ms;
34+
this->timezone = timezone;
35+
missing_cids.clear();
36+
update_cids.clear();
37+
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
38+
auto tablet_column = tablet_schema.column(i);
39+
if (!partial_update_input_columns.contains(tablet_column.name())) {
40+
missing_cids.emplace_back(i);
41+
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
42+
tablet_schema.auto_increment_column() != tablet_column.name()) {
43+
can_insert_new_rows_in_partial_update = false;
44+
}
45+
} else {
46+
update_cids.emplace_back(i);
47+
}
48+
if (auto_increment_column == tablet_column.name()) {
49+
is_schema_contains_auto_inc_column = true;
50+
}
51+
}
52+
this->is_strict_mode = is_strict_mode;
53+
is_input_columns_contains_auto_inc_column =
54+
is_partial_update && partial_update_input_columns.contains(auto_increment_column);
55+
_generate_default_values_for_missing_cids(tablet_schema);
56+
}
57+
58+
void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
59+
partial_update_info_pb->set_is_partial_update(is_partial_update);
60+
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
61+
for (const auto& col : partial_update_input_columns) {
62+
partial_update_info_pb->add_partial_update_input_columns(col);
63+
}
64+
for (auto cid : missing_cids) {
65+
partial_update_info_pb->add_missing_cids(cid);
66+
}
67+
for (auto cid : update_cids) {
68+
partial_update_info_pb->add_update_cids(cid);
69+
}
70+
partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
71+
can_insert_new_rows_in_partial_update);
72+
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
73+
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
74+
partial_update_info_pb->set_timezone(timezone);
75+
partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
76+
is_input_columns_contains_auto_inc_column);
77+
partial_update_info_pb->set_is_schema_contains_auto_inc_column(
78+
is_schema_contains_auto_inc_column);
79+
for (const auto& value : default_values) {
80+
partial_update_info_pb->add_default_values(value);
81+
}
82+
}
83+
84+
void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
85+
is_partial_update = partial_update_info_pb->is_partial_update();
86+
max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
87+
? partial_update_info_pb->max_version_in_flush_phase()
88+
: -1;
89+
partial_update_input_columns.clear();
90+
for (const auto& col : partial_update_info_pb->partial_update_input_columns()) {
91+
partial_update_input_columns.insert(col);
92+
}
93+
missing_cids.clear();
94+
for (auto cid : partial_update_info_pb->missing_cids()) {
95+
missing_cids.push_back(cid);
96+
}
97+
update_cids.clear();
98+
for (auto cid : partial_update_info_pb->update_cids()) {
99+
update_cids.push_back(cid);
100+
}
101+
can_insert_new_rows_in_partial_update =
102+
partial_update_info_pb->can_insert_new_rows_in_partial_update();
103+
is_strict_mode = partial_update_info_pb->is_strict_mode();
104+
timestamp_ms = partial_update_info_pb->timestamp_ms();
105+
timezone = partial_update_info_pb->timezone();
106+
is_input_columns_contains_auto_inc_column =
107+
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
108+
is_schema_contains_auto_inc_column =
109+
partial_update_info_pb->is_schema_contains_auto_inc_column();
110+
default_values.clear();
111+
for (const auto& value : partial_update_info_pb->default_values()) {
112+
default_values.push_back(value);
113+
}
114+
}
115+
116+
std::string PartialUpdateInfo::summary() const {
117+
return fmt::format(
118+
"update_cids={}, missing_cids={}, is_strict_mode={}, max_version_in_flush_phase={}",
119+
update_cids.size(), missing_cids.size(), is_strict_mode, max_version_in_flush_phase);
120+
}
121+
122+
void PartialUpdateInfo::_generate_default_values_for_missing_cids(
123+
const TabletSchema& tablet_schema) {
124+
for (unsigned int cur_cid : missing_cids) {
125+
const auto& column = tablet_schema.column(cur_cid);
126+
if (column.has_default_value()) {
127+
std::string default_value;
128+
if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
129+
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
130+
to_lower(tablet_schema.column(cur_cid).default_value())
131+
.find(to_lower("CURRENT_TIMESTAMP")) !=
132+
std::string::npos)) {
133+
DateV2Value<DateTimeV2ValueType> dtv;
134+
dtv.from_unixtime(timestamp_ms / 1000, timezone);
135+
default_value = dtv.debug_string();
136+
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
137+
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
138+
to_lower(tablet_schema.column(cur_cid).default_value())
139+
.find(to_lower("CURRENT_DATE")) !=
140+
std::string::npos)) {
141+
DateV2Value<DateV2ValueType> dv;
142+
dv.from_unixtime(timestamp_ms / 1000, timezone);
143+
default_value = dv.debug_string();
144+
} else {
145+
default_value = tablet_schema.column(cur_cid).default_value();
146+
}
147+
default_values.emplace_back(default_value);
148+
} else {
149+
// place an empty string here
150+
default_values.emplace_back();
151+
}
152+
}
153+
CHECK_EQ(missing_cids.size(), default_values.size());
154+
}
155+
} // namespace doris

be/src/olap/partial_update_info.h

+12-64
Original file line numberDiff line numberDiff line change
@@ -16,78 +16,26 @@
1616
// under the License.
1717

1818
#pragma once
19-
20-
#include "olap/tablet_schema.h"
19+
#include <cstdint>
20+
#include <set>
21+
#include <string>
22+
#include <vector>
2123

2224
namespace doris {
25+
class TabletSchema;
26+
class PartialUpdateInfoPB;
2327

2428
struct PartialUpdateInfo {
2529
void init(const TabletSchema& tablet_schema, bool partial_update,
26-
const std::set<string>& partial_update_cols, bool is_strict_mode,
30+
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
2731
int64_t timestamp_ms, const std::string& timezone,
28-
const std::string& auto_increment_column, int64_t cur_max_version = -1) {
29-
is_partial_update = partial_update;
30-
partial_update_input_columns = partial_update_cols;
31-
max_version_in_flush_phase = cur_max_version;
32-
this->timestamp_ms = timestamp_ms;
33-
this->timezone = timezone;
34-
missing_cids.clear();
35-
update_cids.clear();
36-
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
37-
auto tablet_column = tablet_schema.column(i);
38-
if (!partial_update_input_columns.contains(tablet_column.name())) {
39-
missing_cids.emplace_back(i);
40-
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
41-
tablet_schema.auto_increment_column() != tablet_column.name()) {
42-
can_insert_new_rows_in_partial_update = false;
43-
}
44-
} else {
45-
update_cids.emplace_back(i);
46-
}
47-
if (auto_increment_column == tablet_column.name()) {
48-
is_schema_contains_auto_inc_column = true;
49-
}
50-
}
51-
this->is_strict_mode = is_strict_mode;
52-
is_input_columns_contains_auto_inc_column =
53-
is_partial_update && partial_update_input_columns.contains(auto_increment_column);
54-
_generate_default_values_for_missing_cids(tablet_schema);
55-
}
32+
const std::string& auto_increment_column, int64_t cur_max_version = -1);
33+
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
34+
void from_pb(PartialUpdateInfoPB* partial_update_info);
35+
std::string summary() const;
5636

5737
private:
58-
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema) {
59-
for (auto i = 0; i < missing_cids.size(); ++i) {
60-
auto cur_cid = missing_cids[i];
61-
const auto& column = tablet_schema.column(cur_cid);
62-
if (column.has_default_value()) {
63-
std::string default_value;
64-
if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
65-
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
66-
to_lower(tablet_schema.column(cur_cid).default_value())
67-
.find(to_lower("CURRENT_TIMESTAMP")) !=
68-
std::string::npos)) {
69-
DateV2Value<DateTimeV2ValueType> dtv;
70-
dtv.from_unixtime(timestamp_ms / 1000, timezone);
71-
default_value = dtv.debug_string();
72-
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
73-
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
74-
to_lower(tablet_schema.column(cur_cid).default_value())
75-
.find(to_lower("CURRENT_DATE")) !=
76-
std::string::npos)) {
77-
DateV2Value<DateV2ValueType> dv;
78-
dv.from_unixtime(timestamp_ms / 1000, timezone);
79-
default_value = dv.debug_string();
80-
} else {
81-
default_value = tablet_schema.column(cur_cid).default_value();
82-
}
83-
default_values.emplace_back(default_value);
84-
} else {
85-
// place an empty string here
86-
default_values.emplace_back();
87-
}
88-
}
89-
CHECK_EQ(missing_cids.size(), default_values.size());
90-
}
38+
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema);
9139

9240
public:
9341
bool is_partial_update {false};

be/src/olap/rowset/rowset_meta_manager.cpp

+94
Original file line numberDiff line numberDiff line change
@@ -533,4 +533,98 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta,
533533
return status;
534534
}
535535

536+
Status RowsetMetaManager::save_partial_update_info(
537+
OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id,
538+
const PartialUpdateInfoPB& partial_update_info_pb) {
539+
std::string key =
540+
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
541+
std::string value;
542+
if (!partial_update_info_pb.SerializeToString(&value)) {
543+
return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
544+
"serialize partial update info failed. key={}", key);
545+
}
546+
VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size();
547+
return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
548+
}
549+
550+
Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
551+
int64_t partition_id, int64_t txn_id,
552+
PartialUpdateInfoPB* partial_update_info_pb) {
553+
std::string key =
554+
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
555+
std::string value;
556+
Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
557+
if (status.is<META_KEY_NOT_FOUND>()) {
558+
return status;
559+
}
560+
if (!status.ok()) {
561+
LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}",
562+
tablet_id, partition_id, txn_id);
563+
return status;
564+
}
565+
if (!partial_update_info_pb->ParseFromString(value)) {
566+
return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
567+
"fail to parse partial update info content to protobuf object. tablet_id={}, "
568+
"partition_id={}, txn_id={}",
569+
tablet_id, partition_id, txn_id);
570+
}
571+
return Status::OK();
572+
}
573+
574+
Status RowsetMetaManager::traverse_partial_update_info(
575+
OlapMeta* meta,
576+
std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) {
577+
auto traverse_partial_update_info_func = [&func](std::string_view key,
578+
std::string_view value) -> bool {
579+
std::vector<std::string> parts;
580+
// key format: pui_{tablet_id}_{partition_id}_{txn_id}
581+
RETURN_IF_ERROR(split_string(key, '_', &parts));
582+
if (parts.size() != 4) {
583+
LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size());
584+
return true;
585+
}
586+
int64_t tablet_id = std::stoll(parts[1]);
587+
int64_t partition_id = std::stoll(parts[2]);
588+
int64_t txn_id = std::stoll(parts[3]);
589+
return func(tablet_id, partition_id, txn_id, value);
590+
};
591+
return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
592+
traverse_partial_update_info_func);
593+
}
594+
595+
Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
596+
int64_t partition_id, int64_t txn_id) {
597+
std::string key =
598+
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
599+
Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
600+
VLOG_NOTICE << "remove partial update info, key=" << key;
601+
return res;
602+
}
603+
604+
Status RowsetMetaManager::remove_partial_update_infos(
605+
OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys) {
606+
std::vector<std::string> remove_keys;
607+
for (auto [tablet_id, partition_id, txn_id] : keys) {
608+
remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
609+
partition_id, txn_id));
610+
}
611+
Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
612+
VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size();
613+
return res;
614+
}
615+
616+
Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta,
617+
int64_t tablet_id) {
618+
std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id);
619+
std::vector<std::string> remove_keys;
620+
auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool {
621+
remove_keys.emplace_back(key);
622+
return true;
623+
};
624+
VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id
625+
<< " removed keys size: " << remove_keys.size();
626+
RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func));
627+
return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
628+
}
629+
536630
} // namespace doris

be/src/olap/rowset/rowset_meta_manager.h

+21
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
1919
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
2020

21+
#include <gen_cpp/olap_file.pb.h>
22+
2123
#include <cstdint>
2224
#include <functional>
2325
#include <string>
@@ -32,11 +34,15 @@
3234
namespace doris {
3335
class OlapMeta;
3436
class RowsetMetaPB;
37+
class PartialUpdateInfoPB;
3538
} // namespace doris
3639

3740
namespace doris {
3841
namespace {
3942
const std::string ROWSET_PREFIX = "rst_";
43+
44+
constexpr std::string_view PARTIAL_UPDATE_INFO_PREFIX = "pui_";
45+
4046
} // namespace
4147

4248
// Helper class for managing rowset meta of one root path.
@@ -80,6 +86,21 @@ class RowsetMetaManager {
8086

8187
static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path);
8288

89+
static Status save_partial_update_info(OlapMeta* meta, int64_t tablet_id, int64_t partition_id,
90+
int64_t txn_id,
91+
const PartialUpdateInfoPB& partial_update_info_pb);
92+
static Status try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
93+
int64_t partition_id, int64_t txn_id,
94+
PartialUpdateInfoPB* partial_update_info_pb);
95+
static Status traverse_partial_update_info(
96+
OlapMeta* meta,
97+
std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func);
98+
static Status remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
99+
int64_t partition_id, int64_t txn_id);
100+
static Status remove_partial_update_infos(
101+
OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys);
102+
static Status remove_tablet_related_partial_update_info(OlapMeta* meta, int64_t tablet_id);
103+
83104
private:
84105
static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
85106
const RowsetMetaPB& rowset_meta_pb);

0 commit comments

Comments
 (0)