-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve](cloud-mow) merge and remove old version of delete bitmap when cumulative compaction is done #40204
Changes from all commits
0bda586
16a5c24
49fdc3a
e211689
824195a
15b08bf
21a6149
ee7d484
0590390
4012872
2b5413c
9c5595c
f149034
98c43c7
41bd2f1
0b53123
61ae01c
354250c
314ff84
6d39059
5484a5f
ea5f6e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
#include "cloud/cloud_cumulative_compaction.h" | ||
|
||
#include "cloud/cloud_meta_mgr.h" | ||
#include "cloud/cloud_tablet_mgr.h" | ||
#include "cloud/config.h" | ||
#include "common/config.h" | ||
#include "common/logging.h" | ||
|
@@ -27,6 +28,7 @@ | |
#include "olap/compaction.h" | ||
#include "olap/cumulative_compaction_policy.h" | ||
#include "service/backend_options.h" | ||
#include "util/debug_points.h" | ||
#include "util/trace.h" | ||
#include "util/uuid_generator.h" | ||
|
||
|
@@ -254,10 +256,10 @@ Status CloudCumulativeCompaction::modify_rowsets() { | |
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string()); | ||
|
||
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr; | ||
int64_t initiator = | ||
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max(); | ||
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && | ||
_tablet->enable_unique_key_merge_on_write()) { | ||
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & | ||
std::numeric_limits<int64_t>::max(); | ||
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction( | ||
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(), | ||
_stats.merged_rows, initiator, output_rowset_delete_bitmap, | ||
|
@@ -340,9 +342,88 @@ Status CloudCumulativeCompaction::modify_rowsets() { | |
stats.num_rows(), stats.data_size()); | ||
} | ||
} | ||
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && | ||
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) { | ||
process_old_version_delete_bitmap(); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
void CloudCumulativeCompaction::process_old_version_delete_bitmap() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function 'process_old_version_delete_bitmap' exceeds recommended size/complexity thresholds [readability-function-size] void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
^ Additional contextbe/src/cloud/cloud_cumulative_compaction.cpp:349: 110 lines including whitespace and comments (threshold 80) void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function 'process_old_version_delete_bitmap' has cognitive complexity of 63 (threshold 50) [readability-function-cognitive-complexity] void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
^ Additional contextbe/src/cloud/cloud_cumulative_compaction.cpp:362: +1, including nesting penalty of 0, nesting level increased to 1 if (!pre_rowsets.empty()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:388: +2, including nesting penalty of 1, nesting level increased to 2 do {
^ be/src/cloud/cloud_cumulative_compaction.cpp:389: +3, including nesting penalty of 2, nesting level increased to 3 if (!new_delete_bitmap->empty()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:393: +4, including nesting penalty of 3, nesting level increased to 4 DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.get_mow_lock_failed", {
^ be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF' if (UNLIKELY(config::enable_debug_points)) { \
^ be/src/cloud/cloud_cumulative_compaction.cpp:393: +5, including nesting penalty of 4, nesting level increased to 5 DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.get_mow_lock_failed", {
^ be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF' if (dp) { \
^ be/src/cloud/cloud_cumulative_compaction.cpp:399: +4, including nesting penalty of 3, nesting level increased to 4 if (get_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:403: +4, including nesting penalty of 3, nesting level increased to 4 if (!get_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:412: +4, including nesting penalty of 3, nesting level increased to 4 DBUG_EXECUTE_IF(
^ be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF' if (UNLIKELY(config::enable_debug_points)) { \
^ be/src/cloud/cloud_cumulative_compaction.cpp:412: +5, including nesting penalty of 4, nesting level increased to 5 DBUG_EXECUTE_IF(
^ be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF' if (dp) { \
^ be/src/cloud/cloud_cumulative_compaction.cpp:418: +4, including nesting penalty of 3, nesting level increased to 4 if (update_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:422: +4, including nesting penalty of 3, nesting level increased to 4 if (!update_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:429: +4, including nesting penalty of 3, nesting level increased to 4 DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.remove_mow_lock_failed", {
^ be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF' if (UNLIKELY(config::enable_debug_points)) { \
^ be/src/cloud/cloud_cumulative_compaction.cpp:429: +5, including nesting penalty of 4, nesting level increased to 5 DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.remove_mow_lock_failed", {
^ be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF' if (dp) { \
^ be/src/cloud/cloud_cumulative_compaction.cpp:435: +4, including nesting penalty of 3, nesting level increased to 4 if (remove_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:439: +4, including nesting penalty of 3, nesting level increased to 4 if (!remove_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:449: +2, including nesting penalty of 1, nesting level increased to 2 if (get_st.ok() && update_st.ok() && remove_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:449: +1 if (get_st.ok() && update_st.ok() && remove_st.ok()) {
^ be/src/cloud/cloud_cumulative_compaction.cpp:454: +3, including nesting penalty of 2, nesting level increased to 3 for (auto it = new_delete_bitmap->delete_bitmap.begin();
^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: function 'process_old_version_delete_bitmap' exceeds recommended size/complexity thresholds [readability-function-size] void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
^ Additional contextbe/src/cloud/cloud_cumulative_compaction.cpp:350: 110 lines including whitespace and comments (threshold 80) void CloudCumulativeCompaction::process_old_version_delete_bitmap() {
^ |
||
// agg previously rowset old version delete bitmap | ||
std::vector<RowsetSharedPtr> pre_rowsets {}; | ||
std::vector<std::string> pre_rowset_ids {}; | ||
for (const auto& it : cloud_tablet()->rowset_map()) { | ||
if (it.first.second < _input_rowsets.front()->start_version()) { | ||
pre_rowsets.emplace_back(it.second); | ||
pre_rowset_ids.emplace_back(it.second->rowset_id().to_string()); | ||
} | ||
} | ||
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator); | ||
if (!pre_rowsets.empty()) { | ||
auto pre_max_version = _output_rowset->version().second; | ||
DeleteBitmapPtr new_delete_bitmap = | ||
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id()); | ||
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>> | ||
to_remove_vec; | ||
for (auto& rowset : pre_rowsets) { | ||
if (rowset->rowset_meta()->total_disk_size() == 0) { | ||
continue; | ||
} | ||
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { | ||
rowset->rowset_id().to_string(); | ||
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0}; | ||
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version}; | ||
DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), seg_id, | ||
pre_max_version - 1}; | ||
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg( | ||
{rowset->rowset_id(), seg_id, pre_max_version}); | ||
to_remove_vec.emplace_back( | ||
std::make_tuple(_tablet->tablet_id(), start, before_end)); | ||
if (d->isEmpty()) { | ||
continue; | ||
} | ||
new_delete_bitmap->set(end, *d); | ||
} | ||
} | ||
if (!new_delete_bitmap->empty()) { | ||
// store agg delete bitmap | ||
Status update_st; | ||
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed", | ||
{ | ||
update_st = Status::InternalError( | ||
"test fail to update delete bitmap for tablet_id {}", | ||
cloud_tablet()->tablet_id()); | ||
}); | ||
if (update_st.ok()) { | ||
update_st = _engine.meta_mgr().update_delete_bitmap_without_lock( | ||
*cloud_tablet(), new_delete_bitmap.get()); | ||
} | ||
if (!update_st.ok()) { | ||
std::stringstream ss; | ||
ss << "failed to update delete bitmap for tablet=" << cloud_tablet()->tablet_id() | ||
<< " st=" << update_st.to_string(); | ||
std::string msg = ss.str(); | ||
LOG(WARNING) << msg; | ||
} else { | ||
Version version(_input_rowsets.front()->start_version(), | ||
_input_rowsets.back()->end_version()); | ||
for (auto it = new_delete_bitmap->delete_bitmap.begin(); | ||
it != new_delete_bitmap->delete_bitmap.end(); it++) { | ||
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second); | ||
} | ||
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(), | ||
to_remove_vec); | ||
DBUG_EXECUTE_IF( | ||
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", { | ||
static_cast<CloudTablet*>(_tablet.get()) | ||
->delete_expired_stale_rowsets(); | ||
}); | ||
} | ||
} | ||
} | ||
} | ||
|
||
void CloudCumulativeCompaction::garbage_collection() { | ||
CloudCompactionMixin::garbage_collection(); | ||
cloud::TabletJobInfoPB job; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include "cloud_delete_bitmap_action.h" | ||
|
||
#include <rapidjson/document.h> | ||
#include <rapidjson/encodings.h> | ||
#include <rapidjson/prettywriter.h> | ||
#include <rapidjson/rapidjson.h> | ||
#include <rapidjson/stringbuffer.h> | ||
|
||
#include <chrono> // IWYU pragma: keep | ||
#include <exception> | ||
#include <future> | ||
#include <memory> | ||
#include <mutex> | ||
#include <sstream> | ||
#include <string> | ||
#include <thread> | ||
#include <utility> | ||
|
||
#include "cloud/cloud_tablet.h" | ||
#include "cloud/cloud_tablet_mgr.h" | ||
#include "common/logging.h" | ||
#include "common/status.h" | ||
#include "gutil/strings/substitute.h" | ||
#include "http/http_channel.h" | ||
#include "http/http_headers.h" | ||
#include "http/http_request.h" | ||
#include "http/http_status.h" | ||
#include "olap/olap_define.h" | ||
#include "olap/storage_engine.h" | ||
#include "olap/tablet_manager.h" | ||
#include "util/doris_metrics.h" | ||
#include "util/stopwatch.hpp" | ||
|
||
namespace doris { | ||
using namespace ErrorCode; | ||
|
||
namespace { | ||
|
||
constexpr std::string_view HEADER_JSON = "application/json"; | ||
|
||
} // namespace | ||
|
||
CloudDeleteBitmapAction::CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, | ||
CloudStorageEngine& engine, | ||
TPrivilegeHier::type hier, | ||
TPrivilegeType::type ptype) | ||
: HttpHandlerWithAuth(exec_env, hier, ptype), | ||
_engine(engine), | ||
_delete_bitmap_action_type(ctype) {} | ||
|
||
static Status _check_param(HttpRequest* req, uint64_t* tablet_id) { | ||
const auto& req_tablet_id = req->param(TABLET_ID_KEY); | ||
if (req_tablet_id.empty()) { | ||
return Status::InternalError("tablet id is empty!"); | ||
} | ||
try { | ||
*tablet_id = std::stoull(req_tablet_id); | ||
} catch (const std::exception& e) { | ||
return Status::InternalError("convert tablet_id failed, {}", e.what()); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* req, | ||
std::string* json_result) { | ||
uint64_t tablet_id = 0; | ||
// check & retrieve tablet_id from req if it contains | ||
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); | ||
if (tablet_id == 0) { | ||
return Status::InternalError("check param failed: missing tablet_id"); | ||
} | ||
|
||
CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id)); | ||
if (tablet == nullptr) { | ||
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); | ||
} | ||
|
||
auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); | ||
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); | ||
auto size = tablet->tablet_meta()->delete_bitmap().get_size(); | ||
|
||
rapidjson::Document root; | ||
root.SetObject(); | ||
root.AddMember("delete_bitmap_count", count, root.GetAllocator()); | ||
hust-hhb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
root.AddMember("cardinality", cardinality, root.GetAllocator()); | ||
root.AddMember("size", size, root.GetAllocator()); | ||
|
||
// to json string | ||
rapidjson::StringBuffer strbuf; | ||
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); | ||
root.Accept(writer); | ||
*json_result = std::string(strbuf.GetString()); | ||
|
||
return Status::OK(); | ||
} | ||
|
||
void CloudDeleteBitmapAction::handle(HttpRequest* req) { | ||
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); | ||
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_INFO) { | ||
std::string json_result; | ||
Status st = _handle_show_delete_bitmap_count(req, &json_result); | ||
if (!st.ok()) { | ||
HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); | ||
} else { | ||
HttpChannel::send_reply(req, HttpStatus::OK, json_result); | ||
} | ||
} | ||
} | ||
|
||
} // namespace doris |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#pragma once | ||
|
||
#include <stdint.h> | ||
|
||
#include <string> | ||
|
||
#include "cloud/cloud_storage_engine.h" | ||
#include "common/status.h" | ||
#include "http/http_handler_with_auth.h" | ||
#include "olap/tablet.h" | ||
|
||
namespace doris { | ||
class HttpRequest; | ||
|
||
class ExecEnv; | ||
|
||
enum class DeleteBitmapActionType { COUNT_INFO = 1 }; | ||
|
||
/// This action is used for viewing the delete bitmap status | ||
class CloudDeleteBitmapAction : public HttpHandlerWithAuth { | ||
public: | ||
CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, | ||
CloudStorageEngine& engine, TPrivilegeHier::type hier, | ||
TPrivilegeType::type ptype); | ||
|
||
~CloudDeleteBitmapAction() override = default; | ||
|
||
void handle(HttpRequest* req) override; | ||
|
||
private: | ||
Status _handle_show_delete_bitmap_count(HttpRequest* req, std::string* json_result); | ||
|
||
private: | ||
CloudStorageEngine& _engine; | ||
DeleteBitmapActionType _delete_bitmap_action_type; | ||
}; | ||
} // namespace doris |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'process_old_version_delete_bitmap' has cognitive complexity of 63 (threshold 50) [readability-function-cognitive-complexity]
Additional context
be/src/cloud/cloud_cumulative_compaction.cpp:361: +1, including nesting penalty of 0, nesting level increased to 1
if (!pre_rowsets.empty()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:387: +2, including nesting penalty of 1, nesting level increased to 2
do { ^
be/src/cloud/cloud_cumulative_compaction.cpp:388: +3, including nesting penalty of 2, nesting level increased to 3
be/src/cloud/cloud_cumulative_compaction.cpp:392: +4, including nesting penalty of 3, nesting level increased to 4
be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'
if (UNLIKELY(config::enable_debug_points)) { \ ^
be/src/cloud/cloud_cumulative_compaction.cpp:392: +5, including nesting penalty of 4, nesting level increased to 5
be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'
if (dp) { \ ^
be/src/cloud/cloud_cumulative_compaction.cpp:398: +4, including nesting penalty of 3, nesting level increased to 4
if (get_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:402: +4, including nesting penalty of 3, nesting level increased to 4
if (!get_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:411: +4, including nesting penalty of 3, nesting level increased to 4
DBUG_EXECUTE_IF( ^
be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'
if (UNLIKELY(config::enable_debug_points)) { \ ^
be/src/cloud/cloud_cumulative_compaction.cpp:411: +5, including nesting penalty of 4, nesting level increased to 5
DBUG_EXECUTE_IF( ^
be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'
if (dp) { \ ^
be/src/cloud/cloud_cumulative_compaction.cpp:417: +4, including nesting penalty of 3, nesting level increased to 4
if (update_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:421: +4, including nesting penalty of 3, nesting level increased to 4
if (!update_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:428: +4, including nesting penalty of 3, nesting level increased to 4
be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'
if (UNLIKELY(config::enable_debug_points)) { \ ^
be/src/cloud/cloud_cumulative_compaction.cpp:428: +5, including nesting penalty of 4, nesting level increased to 5
be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'
if (dp) { \ ^
be/src/cloud/cloud_cumulative_compaction.cpp:434: +4, including nesting penalty of 3, nesting level increased to 4
if (remove_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:438: +4, including nesting penalty of 3, nesting level increased to 4
if (!remove_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:448: +2, including nesting penalty of 1, nesting level increased to 2
if (get_st.ok() && update_st.ok() && remove_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:448: +1
if (get_st.ok() && update_st.ok() && remove_st.ok()) { ^
be/src/cloud/cloud_cumulative_compaction.cpp:453: +3, including nesting penalty of 2, nesting level increased to 3