Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](mow) merge and remove old version of delete bitmap when cumulative compaction is done on local mode #41636

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,28 +383,10 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
auto pre_max_version = _output_rowset->version().second;
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed",
Expand All @@ -424,9 +406,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
{ static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets();
});
}
}
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "cloud_delete_bitmap_action.h"
#include "delete_bitmap_action.h"

#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
Expand All @@ -34,8 +34,10 @@
#include <utility>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
Expand All @@ -44,7 +46,6 @@
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
Expand All @@ -59,10 +60,9 @@ constexpr std::string_view HEADER_JSON = "application/json";

} // namespace

CloudDeleteBitmapAction::CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine,
TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
DeleteBitmapAction::DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
BaseStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
: HttpHandlerWithAuth(exec_env, hier, ptype),
_engine(engine),
_delete_bitmap_action_type(ctype) {}
Expand All @@ -80,20 +80,24 @@ static Status _check_param(HttpRequest* req, uint64_t* tablet_id) {
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
Status DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}

CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
BaseTabletSPtr tablet = nullptr;
if (config::is_cloud_mode()) {
tablet = DORIS_TRY(_engine.to_cloud().tablet_mgr().get_tablet(tablet_id));
} else {
tablet = _engine.to_local().tablet_manager()->get_tablet(tablet_id);
}
if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();
Expand All @@ -115,23 +119,23 @@ Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpReque
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
auto st = _engine.to_cloud().meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
if (!st.ok()) {
LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id
<< ", st=" << st.to_string();
return st;
}
auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true);
auto tablet = std::make_shared<CloudTablet>(_engine.to_cloud(), std::move(tablet_meta));
st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
return st;
Expand All @@ -157,7 +161,7 @@ Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest*
return Status::OK();
}

void CloudDeleteBitmapAction::handle(HttpRequest* req) {
void DeleteBitmapAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) {
std::string json_result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

#include <string>

#include "cloud/cloud_storage_engine.h"
#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"

namespace doris {
Expand All @@ -35,13 +35,12 @@ class ExecEnv;
enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 };

/// This action is used for viewing the delete bitmap status
class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
class DeleteBitmapAction : public HttpHandlerWithAuth {
public:
CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype);
DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, BaseStorageEngine& engine,
TPrivilegeHier::type hier, TPrivilegeType::type ptype);

~CloudDeleteBitmapAction() override = default;
~DeleteBitmapAction() override = default;

void handle(HttpRequest* req) override;

Expand All @@ -50,7 +49,7 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result);

private:
CloudStorageEngine& _engine;
BaseStorageEngine& _engine;
DeleteBitmapActionType _delete_bitmap_action_type;
};
#include "common/compile_check_end.h"
Expand Down
64 changes: 64 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,33 @@ Status CloudCompactionMixin::update_delete_bitmap() {
return Status::OK();
}

void Compaction::agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap) {
// agg previously rowset old version delete bitmap
auto pre_max_version = _output_rowset->version().second;
new_delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
}

Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
// only do index compaction for dup_keys and unique_keys with mow enabled
if (config::inverted_index_compaction_enable &&
Expand Down Expand Up @@ -1103,6 +1130,13 @@ Status CompactionMixin::modify_rowsets() {
tablet()->delete_expired_stale_rowset();
}

if (config::enable_delete_bitmap_merge_on_compaction &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
process_old_version_delete_bitmap();
}

int64_t cur_max_version = 0;
{
std::shared_lock rlock(_tablet->get_header_lock());
Expand All @@ -1121,6 +1155,36 @@ Status CompactionMixin::modify_rowsets() {
return Status::OK();
}

void CompactionMixin::process_old_version_delete_bitmap() {
std::vector<RowsetSharedPtr> pre_rowsets {};
for (const auto& it : tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<Tablet*>(_tablet.get())->delete_expired_stale_rowset();
});
}
}
}

bool CompactionMixin::_check_if_includes_input_rowsets(
const RowsetIdUnorderedSet& commit_rowset_ids_set) const {
std::vector<RowsetId> commit_rowset_ids {};
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class Compaction {

virtual Status update_delete_bitmap() = 0;

void agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap);

// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;

Expand Down Expand Up @@ -162,6 +168,8 @@ class CompactionMixin : public Compaction {

Status do_compact_ordered_rowsets();

void process_old_version_delete_bitmap();

bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const;

PendingRowsetGuard _pending_rs_guard;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << _context.tablet->tablet_id()
<< ", rowset_ids: " << _context.mow_context->rowset_ids.size()
<< ", cur max_version: " << _context.mow_context->max_version
<< ", transaction_id: " << _context.mow_context->txn_id
<< ", transaction_id: " << _context.mow_context->txn_id << ", delete_bitmap_count: "
<< _context.tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count()
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
return Status::OK();
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -817,10 +817,13 @@ void Tablet::delete_expired_stale_rowset() {
auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();

// do delete operation
std::vector<std::string> version_to_delete;
auto to_delete_iter = stale_version_path_map.begin();
while (to_delete_iter != stale_version_path_map.end()) {
std::vector<TimestampedVersionSharedPtr>& to_delete_version =
to_delete_iter->second->timestamped_versions();
int64_t start_version = -1;
int64_t end_version = -1;
for (auto& timestampedVersion : to_delete_version) {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
Expand All @@ -841,10 +844,17 @@ void Tablet::delete_expired_stale_rowset() {
<< timestampedVersion->version().second
<< "] not find in stale rs version map";
}
if (start_version < 0) {
start_version = timestampedVersion->version().first;
}
end_version = timestampedVersion->version().second;
_delete_stale_rowset_by_version(timestampedVersion->version());
}
Version version(start_version, end_version);
version_to_delete.emplace_back(version.to_string());
to_delete_iter++;
}
_tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);

bool reconstructed = _reconstruct_version_tracker_if_necessary();

Expand Down
18 changes: 14 additions & 4 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,9 @@ uint64_t DeleteBitmap::cardinality() const {
std::shared_lock l(lock);
uint64_t res = 0;
for (auto entry : delete_bitmap) {
res += entry.second.cardinality();
if (std::get<1>(entry.first) != DeleteBitmap::INVALID_SEGMENT_ID) {
res += entry.second.cardinality();
}
}
return res;
}
Expand All @@ -1100,7 +1102,9 @@ uint64_t DeleteBitmap::get_size() const {
std::shared_lock l(lock);
uint64_t charge = 0;
for (auto& [k, v] : delete_bitmap) {
charge += v.getSizeInBytes();
if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
charge += v.getSizeInBytes();
}
}
return charge;
}
Expand Down Expand Up @@ -1219,7 +1223,7 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector<std::
_stale_delete_bitmap.erase(version_str);
}
}
if (tablet_id == -1 || to_delete.empty()) {
if (tablet_id == -1 || to_delete.empty() || !config::is_cloud_mode()) {
return;
}
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Expand All @@ -1232,7 +1236,13 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector<std::

uint64_t DeleteBitmap::get_delete_bitmap_count() {
std::shared_lock l(lock);
return delete_bitmap.size();
uint64_t count = 0;
for (auto it = delete_bitmap.begin(); it != delete_bitmap.end(); it++) {
if (std::get<1>(it->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
count++;
}
}
return count;
}

// We cannot just copy the underlying memory to construct a string
Expand Down
Loading
Loading