|
22 | 22 |
|
23 | 23 | #include <chrono>
|
24 | 24 | #include <cstddef>
|
| 25 | +#include <sstream> |
25 | 26 |
|
26 |
| -#include "common/bvars.h" |
27 | 27 | #include "common/config.h"
|
28 | 28 | #include "common/logging.h"
|
29 |
| -#include "common/stopwatch.h" |
30 | 29 | #include "common/util.h"
|
31 | 30 | #include "cpp/sync_point.h"
|
32 | 31 | #include "meta-service/keys.h"
|
@@ -58,21 +57,33 @@ static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;
|
58 | 57 | // 2. When if cu compaction, we need to guarantee the start version
|
59 | 58 | // is large than alter_version.
|
60 | 59 | bool check_compaction_input_verions(const TabletCompactionJobPB& compaction,
|
61 |
| - const TabletJobInfoPB& job_pb) { |
62 |
| - if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) return true; |
| 60 | + const TabletJobInfoPB& job_pb, std::stringstream& ss) { |
| 61 | + if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) { |
| 62 | + return true; |
| 63 | + } |
| 64 | + if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) { |
| 65 | + return true; |
| 66 | + } |
63 | 67 | if (compaction.input_versions_size() != 2 ||
|
64 | 68 | compaction.input_versions(0) > compaction.input_versions(1)) {
|
65 |
| - LOG(WARNING) << "The compaction need to know [start_version, end_version], and \ |
66 |
| - the start_version should LE end_version. \n" |
67 |
| - << proto_to_json(compaction); |
| 69 | + SS << "The compaction need to know [start_version, end_version], and the start_version " |
| 70 | + "should LE end_version. \n" |
| 71 | + << "compaction job=" << proto_to_json(compaction); |
68 | 72 | return false;
|
69 | 73 | }
|
70 | 74 |
|
71 | 75 | int64_t alter_version = job_pb.schema_change().alter_version();
|
72 |
| - return (compaction.type() == TabletCompactionJobPB_CompactionType_BASE && |
73 |
| - compaction.input_versions(1) <= alter_version) || |
74 |
| - (compaction.type() == TabletCompactionJobPB_CompactionType_CUMULATIVE && |
75 |
| - compaction.input_versions(0) > alter_version); |
| 76 | + bool legal = (compaction.type() == TabletCompactionJobPB::BASE && |
| 77 | + compaction.input_versions(1) <= alter_version) || |
| 78 | + (compaction.type() == TabletCompactionJobPB::CUMULATIVE && |
| 79 | + compaction.input_versions(0) > alter_version); |
| 80 | + if (legal) { |
| 81 | + return true; |
| 82 | + } |
| 83 | + SS << "Check compaction input versions failed in schema change. input_version_start=" |
| 84 | + << compaction.input_versions(0) << " input_version_end=" << compaction.input_versions(1) |
| 85 | + << " schema_change_alter_version=" << job_pb.schema_change().alter_version(); |
| 86 | + return false; |
76 | 87 | }
|
77 | 88 |
|
78 | 89 | void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringstream& ss,
|
@@ -150,11 +161,7 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst
|
150 | 161 | }
|
151 | 162 | while (err == TxnErrorCode::TXN_OK) {
|
152 | 163 | job_pb.ParseFromString(job_val);
|
153 |
| - if (!check_compaction_input_verions(compaction, job_pb)) { |
154 |
| - SS << "Check compaction input versions failed in schema change. input_version_start=" |
155 |
| - << compaction.input_versions(0) |
156 |
| - << " input_version_end=" << compaction.input_versions(1) |
157 |
| - << " schema_change_alter_version=" << job_pb.schema_change().alter_version(); |
| 164 | + if (!check_compaction_input_verions(compaction, job_pb, ss)) { |
158 | 165 | msg = ss.str();
|
159 | 166 | INSTANCE_LOG(INFO) << msg;
|
160 | 167 | code = MetaServiceCode::JOB_CHECK_ALTER_VERSION;
|
@@ -612,10 +619,7 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
|
612 | 619 |
|
613 | 620 | bool abort_compaction = false;
|
614 | 621 | if (request->action() == FinishTabletJobRequest::COMMIT &&
|
615 |
| - !check_compaction_input_verions(compaction, recorded_job)) { |
616 |
| - SS << "Check compaction input versions failed in schema change. input_version_start=" |
617 |
| - << compaction.input_versions(0) << " input_version_end=" << compaction.input_versions(1) |
618 |
| - << " schema_change_alter_version=" << recorded_job.schema_change().alter_version(); |
| 622 | + !check_compaction_input_verions(compaction, recorded_job, ss)) { |
619 | 623 | msg = ss.str();
|
620 | 624 | INSTANCE_LOG(INFO) << msg;
|
621 | 625 | abort_compaction = true;
|
|
0 commit comments