Skip to content

Commit f9af921

Browse files
committed
update
tmp tmp tmp tmp finish finish case tmp fix fix complie tmp tmp tmp tmp
1 parent 4b58cee commit f9af921

13 files changed

+571
-77
lines changed

be/src/olap/partial_update_info.cpp

+154
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "olap/partial_update_info.h"
19+
20+
#include <gen_cpp/olap_file.pb.h>
21+
22+
#include "olap/tablet_schema.h"
23+
24+
namespace doris {
25+
26+
void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
27+
const std::set<string>& partial_update_cols, bool is_strict_mode,
28+
int64_t timestamp_ms, const std::string& timezone,
29+
const std::string& auto_increment_column, int64_t cur_max_version) {
30+
is_partial_update = partial_update;
31+
partial_update_input_columns = partial_update_cols;
32+
max_version_in_flush_phase = cur_max_version;
33+
this->timestamp_ms = timestamp_ms;
34+
this->timezone = timezone;
35+
missing_cids.clear();
36+
update_cids.clear();
37+
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
38+
auto tablet_column = tablet_schema.column(i);
39+
if (!partial_update_input_columns.contains(tablet_column.name())) {
40+
missing_cids.emplace_back(i);
41+
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
42+
tablet_schema.auto_increment_column() != tablet_column.name()) {
43+
can_insert_new_rows_in_partial_update = false;
44+
}
45+
} else {
46+
update_cids.emplace_back(i);
47+
}
48+
if (auto_increment_column == tablet_column.name()) {
49+
is_schema_contains_auto_inc_column = true;
50+
}
51+
}
52+
this->is_strict_mode = is_strict_mode;
53+
is_input_columns_contains_auto_inc_column =
54+
is_partial_update && partial_update_input_columns.contains(auto_increment_column);
55+
_generate_default_values_for_missing_cids(tablet_schema);
56+
}
57+
58+
void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
59+
partial_update_info_pb->set_is_partial_update(is_partial_update);
60+
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
61+
for (const auto& col : partial_update_input_columns) {
62+
partial_update_info_pb->add_partial_update_input_columns(col);
63+
}
64+
for (auto cid : missing_cids) {
65+
partial_update_info_pb->add_missing_cids(cid);
66+
}
67+
for (auto cid : update_cids) {
68+
partial_update_info_pb->add_update_cids(cid);
69+
}
70+
partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
71+
can_insert_new_rows_in_partial_update);
72+
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
73+
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
74+
partial_update_info_pb->set_timezone(timezone);
75+
partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
76+
is_input_columns_contains_auto_inc_column);
77+
partial_update_info_pb->set_is_schema_contains_auto_inc_column(
78+
is_schema_contains_auto_inc_column);
79+
for (const auto& value : default_values) {
80+
partial_update_info_pb->add_default_values(value);
81+
}
82+
}
83+
84+
void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
85+
is_partial_update = partial_update_info_pb->is_partial_update();
86+
max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
87+
? partial_update_info_pb->max_version_in_flush_phase()
88+
: -1;
89+
partial_update_input_columns.clear();
90+
for (const auto& col : partial_update_info_pb->partial_update_input_columns()) {
91+
partial_update_input_columns.insert(col);
92+
}
93+
missing_cids.clear();
94+
for (auto cid : partial_update_info_pb->missing_cids()) {
95+
missing_cids.push_back(cid);
96+
}
97+
update_cids.clear();
98+
for (auto cid : partial_update_info_pb->update_cids()) {
99+
update_cids.push_back(cid);
100+
}
101+
can_insert_new_rows_in_partial_update =
102+
partial_update_info_pb->can_insert_new_rows_in_partial_update();
103+
is_strict_mode = partial_update_info_pb->is_strict_mode();
104+
timestamp_ms = partial_update_info_pb->timestamp_ms();
105+
timezone = partial_update_info_pb->timezone();
106+
is_input_columns_contains_auto_inc_column =
107+
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
108+
is_schema_contains_auto_inc_column =
109+
partial_update_info_pb->is_schema_contains_auto_inc_column();
110+
default_values.clear();
111+
for (const auto& value : partial_update_info_pb->default_values()) {
112+
default_values.push_back(value);
113+
}
114+
}
115+
116+
std::string PartialUpdateInfo::summary() const {
117+
return fmt::format("update_cids:{}, missing_cids:{}, is_strict_mode:{}", update_cids.size(),
118+
missing_cids.size(), is_strict_mode);
119+
}
120+
121+
void PartialUpdateInfo::_generate_default_values_for_missing_cids(
122+
const TabletSchema& tablet_schema) {
123+
for (unsigned int cur_cid : missing_cids) {
124+
const auto& column = tablet_schema.column(cur_cid);
125+
if (column.has_default_value()) {
126+
std::string default_value;
127+
if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
128+
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
129+
to_lower(tablet_schema.column(cur_cid).default_value())
130+
.find(to_lower("CURRENT_TIMESTAMP")) !=
131+
std::string::npos)) {
132+
DateV2Value<DateTimeV2ValueType> dtv;
133+
dtv.from_unixtime(timestamp_ms / 1000, timezone);
134+
default_value = dtv.debug_string();
135+
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
136+
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
137+
to_lower(tablet_schema.column(cur_cid).default_value())
138+
.find(to_lower("CURRENT_DATE")) !=
139+
std::string::npos)) {
140+
DateV2Value<DateV2ValueType> dv;
141+
dv.from_unixtime(timestamp_ms / 1000, timezone);
142+
default_value = dv.debug_string();
143+
} else {
144+
default_value = tablet_schema.column(cur_cid).default_value();
145+
}
146+
default_values.emplace_back(default_value);
147+
} else {
148+
// place an empty string here
149+
default_values.emplace_back();
150+
}
151+
}
152+
CHECK_EQ(missing_cids.size(), default_values.size());
153+
}
154+
} // namespace doris

be/src/olap/partial_update_info.h

+11-64
Original file line numberDiff line numberDiff line change
@@ -16,78 +16,25 @@
1616
// under the License.
1717

1818
#pragma once
19-
20-
#include "olap/tablet_schema.h"
19+
#include <set>
20+
#include <string>
21+
#include <vector>
2122

2223
namespace doris {
24+
class TabletSchema;
25+
class PartialUpdateInfoPB;
2326

2427
struct PartialUpdateInfo {
2528
void init(const TabletSchema& tablet_schema, bool partial_update,
26-
const std::set<string>& partial_update_cols, bool is_strict_mode,
29+
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
2730
int64_t timestamp_ms, const std::string& timezone,
28-
const std::string& auto_increment_column, int64_t cur_max_version = -1) {
29-
is_partial_update = partial_update;
30-
partial_update_input_columns = partial_update_cols;
31-
max_version_in_flush_phase = cur_max_version;
32-
this->timestamp_ms = timestamp_ms;
33-
this->timezone = timezone;
34-
missing_cids.clear();
35-
update_cids.clear();
36-
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
37-
auto tablet_column = tablet_schema.column(i);
38-
if (!partial_update_input_columns.contains(tablet_column.name())) {
39-
missing_cids.emplace_back(i);
40-
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
41-
tablet_schema.auto_increment_column() != tablet_column.name()) {
42-
can_insert_new_rows_in_partial_update = false;
43-
}
44-
} else {
45-
update_cids.emplace_back(i);
46-
}
47-
if (auto_increment_column == tablet_column.name()) {
48-
is_schema_contains_auto_inc_column = true;
49-
}
50-
}
51-
this->is_strict_mode = is_strict_mode;
52-
is_input_columns_contains_auto_inc_column =
53-
is_partial_update && partial_update_input_columns.contains(auto_increment_column);
54-
_generate_default_values_for_missing_cids(tablet_schema);
55-
}
31+
const std::string& auto_increment_column, int64_t cur_max_version = -1);
32+
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
33+
void from_pb(PartialUpdateInfoPB* partial_update_info);
34+
std::string summary() const;
5635

5736
private:
58-
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema) {
59-
for (auto i = 0; i < missing_cids.size(); ++i) {
60-
auto cur_cid = missing_cids[i];
61-
const auto& column = tablet_schema.column(cur_cid);
62-
if (column.has_default_value()) {
63-
std::string default_value;
64-
if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
65-
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
66-
to_lower(tablet_schema.column(cur_cid).default_value())
67-
.find(to_lower("CURRENT_TIMESTAMP")) !=
68-
std::string::npos)) {
69-
DateV2Value<DateTimeV2ValueType> dtv;
70-
dtv.from_unixtime(timestamp_ms / 1000, timezone);
71-
default_value = dtv.debug_string();
72-
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
73-
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
74-
to_lower(tablet_schema.column(cur_cid).default_value())
75-
.find(to_lower("CURRENT_DATE")) !=
76-
std::string::npos)) {
77-
DateV2Value<DateV2ValueType> dv;
78-
dv.from_unixtime(timestamp_ms / 1000, timezone);
79-
default_value = dv.debug_string();
80-
} else {
81-
default_value = tablet_schema.column(cur_cid).default_value();
82-
}
83-
default_values.emplace_back(default_value);
84-
} else {
85-
// place an empty string here
86-
default_values.emplace_back();
87-
}
88-
}
89-
CHECK_EQ(missing_cids.size(), default_values.size());
90-
}
37+
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema);
9138

9239
public:
9340
bool is_partial_update {false};

be/src/olap/rowset/rowset_meta_manager.cpp

+82
Original file line numberDiff line numberDiff line change
@@ -533,4 +533,86 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta,
533533
return status;
534534
}
535535

536+
Status RowsetMetaManager::save_partial_update_info(
537+
OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id,
538+
const PartialUpdateInfoPB& partial_update_info_pb) {
539+
std::string key =
540+
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
541+
std::string value;
542+
if (!partial_update_info_pb.SerializeToString(&value)) {
543+
return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
544+
"serialize partial update info failed. key:{}", key);
545+
}
546+
VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size();
547+
return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
548+
}
549+
550+
Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
551+
int64_t partition_id, int64_t txn_id,
552+
PartialUpdateInfoPB* partial_update_info_pb) {
553+
std::string key =
554+
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
555+
std::string value;
556+
Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
557+
if (status.is<META_KEY_NOT_FOUND>()) {
558+
return status;
559+
}
560+
if (!status.ok()) {
561+
LOG_WARNING("failed to get partial update info. tablet_id={}, txn_id={}", tablet_id,
562+
txn_id);
563+
return status;
564+
}
565+
if (!partial_update_info_pb->ParseFromString(value)) {
566+
return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
567+
"fail to parse partial update info content to protobuf object. tablet_id={}, "
568+
"txn_id={}",
569+
tablet_id, txn_id);
570+
}
571+
return Status::OK();
572+
}
573+
574+
Status RowsetMetaManager::traverse_partial_update_info(
575+
OlapMeta* meta,
576+
std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) {
577+
auto traverse_partial_update_info_func = [&func](std::string_view key,
578+
std::string_view value) -> bool {
579+
std::vector<std::string> parts;
580+
// key format: pui_{tablet_id}_{partition_id}_{txn_id}
581+
RETURN_IF_ERROR(split_string(key, '_', &parts));
582+
if (parts.size() != 4) {
583+
LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size());
584+
return true;
585+
}
586+
int64_t tablet_id = std::stoll(parts[1]);
587+
int64_t partition_id = std::stoll(parts[3]);
588+
int64_t txn_id = std::stoll(parts[3]);
589+
return func(tablet_id, partition_id, txn_id, value);
590+
};
591+
return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
592+
traverse_partial_update_info_func);
593+
}
594+
595+
Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
596+
int64_t partition_id, int64_t txn_id) {
597+
std::string key =
598+
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
599+
Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
600+
VLOG_NOTICE << "remove partial update info, key=" << key;
601+
return res;
602+
}
603+
604+
Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta,
605+
int64_t tablet_id) {
606+
std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id);
607+
std::vector<std::string> remove_keys;
608+
auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool {
609+
remove_keys.emplace_back(key);
610+
return true;
611+
};
612+
VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id
613+
<< " removed keys size: " << remove_keys.size();
614+
RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func));
615+
return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
616+
}
617+
536618
} // namespace doris

be/src/olap/rowset/rowset_meta_manager.h

+19
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
1919
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
2020

21+
#include <gen_cpp/olap_file.pb.h>
22+
2123
#include <cstdint>
2224
#include <functional>
2325
#include <string>
@@ -32,11 +34,15 @@
3234
namespace doris {
3335
class OlapMeta;
3436
class RowsetMetaPB;
37+
class PartialUpdateInfoPB;
3538
} // namespace doris
3639

3740
namespace doris {
3841
namespace {
3942
const std::string ROWSET_PREFIX = "rst_";
43+
44+
constexpr std::string_view PARTIAL_UPDATE_INFO_PREFIX = "pui_";
45+
4046
} // namespace
4147

4248
// Helper class for managing rowset meta of one root path.
@@ -80,6 +86,19 @@ class RowsetMetaManager {
8086

8187
static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path);
8288

89+
static Status save_partial_update_info(OlapMeta* meta, int64_t tablet_id, int64_t partition_id,
90+
int64_t txn_id,
91+
const PartialUpdateInfoPB& partial_update_info_pb);
92+
static Status try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
93+
int64_t partition_id, int64_t txn_id,
94+
PartialUpdateInfoPB* partial_update_info_pb);
95+
static Status traverse_partial_update_info(
96+
OlapMeta* meta,
97+
std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func);
98+
static Status remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
99+
int64_t partition_id, int64_t txn_id);
100+
static Status remove_tablet_related_partial_update_info(OlapMeta* meta, int64_t tablet_id);
101+
83102
private:
84103
static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
85104
const RowsetMetaPB& rowset_meta_pb);

be/src/olap/rowset_builder.cpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "olap/rowset/beta_rowset_writer.h"
4242
#include "olap/rowset/pending_rowset_helper.h"
4343
#include "olap/rowset/rowset_meta.h"
44+
#include "olap/rowset/rowset_meta_manager.h"
4445
#include "olap/rowset/rowset_writer.h"
4546
#include "olap/rowset/rowset_writer_context.h"
4647
#include "olap/schema_change.h"
@@ -333,10 +334,11 @@ Status RowsetBuilder::commit_txn() {
333334
// => update_schema: A(bigint), B(double), C(int), D(int)
334335
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
335336
}
337+
336338
// Transfer ownership of `PendingRowsetGuard` to `TxnManager`
337-
Status res = _engine.txn_manager()->commit_txn(_req.partition_id, *tablet(), _req.txn_id,
338-
_req.load_id, _rowset,
339-
std::move(_pending_rs_guard), false);
339+
Status res = _engine.txn_manager()->commit_txn(
340+
_req.partition_id, *tablet(), _req.txn_id, _req.load_id, _rowset,
341+
std::move(_pending_rs_guard), false, _partial_update_info);
340342

341343
if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
342344
LOG(WARNING) << "Failed to commit txn: " << _req.txn_id

0 commit comments

Comments
 (0)