Skip to content

Commit 2a115b4

Browse files
authored
Merge branch 'master' into disable_file_cache
2 parents 3f1238a + 3aad31f commit 2a115b4

File tree

284 files changed

+10687
-3332
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

284 files changed

+10687
-3332
lines changed

.github/CODEOWNERS

+1
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616
#
1717
be/src/io/* @platoneko @gavinchou @dataroaring
1818
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @dataroaring @CalvinKirs @morningman
19+
**/pom.xml @CalvinKirs @morningman

be/src/cloud/cloud_base_compaction.cpp

+37-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "service/backend_options.h"
3030
#include "util/thread.h"
3131
#include "util/uuid_generator.h"
32+
#include "vec/runtime/vdatetime_value.h"
3233

3334
namespace doris {
3435
using namespace ErrorCode;
@@ -82,21 +83,40 @@ Status CloudBaseCompaction::prepare_compact() {
8283
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
8384
compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
8485
compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
86+
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
87+
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
8588
using namespace std::chrono;
8689
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
8790
_expiration = now + config::compaction_timeout_seconds;
8891
compaction_job->set_expiration(_expiration);
8992
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
9093
cloud::StartTabletJobResponse resp;
91-
//auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
9294
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
95+
if (resp.has_alter_version()) {
96+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
97+
}
9398
if (!st.ok()) {
9499
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
95100
// set last_sync_time to 0 to force sync tablet next time
96101
cloud_tablet()->last_sync_time_s = 0;
97102
} else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
98103
// tablet not found
99104
cloud_tablet()->clear_cache();
105+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
106+
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
107+
std::stringstream ss;
108+
ss << "failed to prepare cumu compaction. Check compaction input versions "
109+
"failed in schema change. The input version end must "
110+
"less than or equal to alter_version."
111+
"current alter version in BE is not correct."
112+
"input_version_start="
113+
<< compaction_job->input_versions(0)
114+
<< " input_version_end=" << compaction_job->input_versions(1)
115+
<< " current alter_version=" << cloud_tablet->alter_version()
116+
<< " schema_change_alter_version=" << resp.alter_version();
117+
std::string msg = ss.str();
118+
LOG(WARNING) << msg;
119+
return Status::InternalError(msg);
100120
}
101121
return st;
102122
}
@@ -314,6 +334,22 @@ Status CloudBaseCompaction::modify_rowsets() {
314334
if (!st.ok()) {
315335
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
316336
cloud_tablet()->clear_cache();
337+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
338+
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
339+
std::stringstream ss;
340+
ss << "failed to prepare cumu compaction. Check compaction input versions "
341+
"failed in schema change. The input version end must "
342+
"less than or equal to alter_version."
343+
"current alter version in BE is not correct."
344+
"input_version_start="
345+
<< compaction_job->input_versions(0)
346+
<< " input_version_end=" << compaction_job->input_versions(1)
347+
<< " current alter_version=" << cloud_tablet->alter_version()
348+
<< " schema_change_alter_version=" << resp.alter_version();
349+
std::string msg = ss.str();
350+
LOG(WARNING) << msg;
351+
cloud_tablet->set_alter_version(resp.alter_version());
352+
return Status::InternalError(msg);
317353
}
318354
return st;
319355
}

be/src/cloud/cloud_cumulative_compaction.cpp

+38-8
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
4848
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;
4949

5050
Status CloudCumulativeCompaction::prepare_compact() {
51-
if (_tablet->tablet_state() != TABLET_RUNNING) {
51+
if (_tablet->tablet_state() != TABLET_RUNNING &&
52+
(!config::enable_new_tablet_do_compaction ||
53+
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
5254
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
5355
}
5456

@@ -110,11 +112,11 @@ Status CloudCumulativeCompaction::prepare_compact() {
110112
_expiration = now + config::compaction_timeout_seconds;
111113
compaction_job->set_expiration(_expiration);
112114
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
113-
if (config::enable_parallel_cumu_compaction) {
114-
// Set input version range to let meta-service judge version range conflict
115-
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
116-
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
117-
}
115+
116+
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
117+
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
118+
// Set input version range to let meta-service check version range conflict
119+
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
118120
cloud::StartTabletJobResponse resp;
119121
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
120122
if (!st.ok()) {
@@ -141,6 +143,18 @@ Status CloudCumulativeCompaction::prepare_compact() {
141143
.tag("msg", resp.status().msg());
142144
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
143145
}
146+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
147+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
148+
std::stringstream ss;
149+
ss << "failed to prepare cumu compaction. Check compaction input versions "
150+
"failed in schema change. "
151+
"input_version_start="
152+
<< compaction_job->input_versions(0)
153+
<< " input_version_end=" << compaction_job->input_versions(1)
154+
<< " schema_change_alter_version=" << resp.alter_version();
155+
std::string msg = ss.str();
156+
LOG(WARNING) << msg;
157+
return Status::InternalError(msg);
144158
}
145159
return st;
146160
}
@@ -256,12 +270,27 @@ Status CloudCumulativeCompaction::modify_rowsets() {
256270

257271
cloud::FinishTabletJobResponse resp;
258272
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
273+
if (resp.has_alter_version()) {
274+
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
275+
}
259276
if (!st.ok()) {
260277
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
261278
cloud_tablet()->clear_cache();
279+
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
280+
std::stringstream ss;
281+
ss << "failed to prepare cumu compaction. Check compaction input versions "
282+
"failed in schema change. "
283+
"input_version_start="
284+
<< compaction_job->input_versions(0)
285+
<< " input_version_end=" << compaction_job->input_versions(1)
286+
<< " schema_change_alter_version=" << resp.alter_version();
287+
std::string msg = ss.str();
288+
LOG(WARNING) << msg;
289+
return Status::InternalError(msg);
262290
}
263291
return st;
264292
}
293+
265294
auto& stats = resp.stats();
266295
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
267296
{
@@ -344,8 +373,9 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
344373
std::shared_lock rlock(_tablet->get_header_lock());
345374
_base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
346375
_cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt();
347-
int64_t candidate_version =
348-
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1);
376+
int64_t candidate_version = std::max(
377+
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1),
378+
cloud_tablet()->alter_version() + 1);
349379
// Get all rowsets whose version >= `candidate_version` as candidate rowsets
350380
cloud_tablet()->traverse_rowsets(
351381
[&candidate_rowsets, candidate_version](const RowsetSharedPtr& rs) {

be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp

+10-8
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
154154
};
155155
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
156156
auto sync_st = tablet->sync_rowsets();
157-
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
158-
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
159-
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
160-
"tablet_id: "
161-
<< _tablet_id << " txn_id: " << _transaction_id
162-
<< ", request_version=" << _version;
163-
return sync_st;
164-
}
165157
if (!sync_st.ok()) {
166158
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
167159
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
168160
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
169161
return sync_st;
170162
}
163+
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
164+
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
165+
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
166+
"tablet_id: "
167+
<< _tablet_id << " txn_id: " << _transaction_id
168+
<< ", request_version=" << _version;
169+
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
170+
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
171+
tablet->tablet_id());
172+
}
171173
}
172174
auto sync_rowset_time_us = MonotonicMicros() - t2;
173175
max_version = tablet->max_version_unlocked();

be/src/cloud/cloud_meta_mgr.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
448448
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
449449
tablet->last_sync_time_s = now;
450450

451-
if (tablet->enable_unique_key_merge_on_write()) {
451+
// If is mow, the tablet has no delete bitmap in base rowsets.
452+
// So dont need to sync it.
453+
if (tablet->enable_unique_key_merge_on_write() &&
454+
tablet->tablet_state() == TABLET_RUNNING) {
452455
DeleteBitmap delete_bitmap(tablet_id);
453456
int64_t old_max_version = req.start_version() - 1;
454457
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),

0 commit comments

Comments
 (0)