Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](merge-on-write) Fix duplicate key problem after adding sequence column for merge-on-write table #39958

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,21 +427,22 @@ Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation&
return Status::OK();
}

Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema,
bool with_seq_col,
const std::vector<RowsetSharedPtr>& specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset, bool with_rowid) {
SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
size_t seq_col_length = 0;
if (_tablet_meta->tablet_schema()->has_sequence_col() && with_seq_col) {
seq_col_length = _tablet_meta->tablet_schema()
->column(_tablet_meta->tablet_schema()->sequence_col_idx())
.length() +
1;
// use the latest tablet schema to decide if the tablet has sequence column currently
const TabletSchema* schema =
(latest_schema == nullptr ? _tablet_meta->tablet_schema().get() : latest_schema);
if (schema->has_sequence_col() && with_seq_col) {
seq_col_length = schema->column(schema->sequence_col_idx()).length() + 1;
}
size_t rowid_length = 0;
if (with_rowid && !_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) {
if (with_rowid && !schema->cluster_key_idxes().empty()) {
rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
}
Slice key_without_seq =
Expand All @@ -457,7 +458,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
for (int i = num_segments - 1; i >= 0; i--) {
// If mow table has cluster keys, the key bounds is short keys, not primary keys
// use PrimaryKeyIndexMetaPB in primary key index?
if (_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) {
if (schema->cluster_key_idxes().empty()) {
if (key_without_seq.compare(segments_key_bounds[i].max_key()) > 0 ||
key_without_seq.compare(segments_key_bounds[i].min_key()) < 0) {
continue;
Expand All @@ -478,7 +479,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
DCHECK_EQ(segments.size(), num_segments);

for (auto id : picked_segments) {
Status s = segments[id]->lookup_row_key(encoded_key, with_seq_col, with_rowid, &loc);
Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid,
&loc);
if (s.is<KEY_NOT_FOUND>()) {
continue;
}
Expand All @@ -489,7 +491,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
{loc.rowset_id, loc.segment_id, version}, loc.row_id)) {
// if has sequence col, we continue to compare the sequence_id of
// all rowsets, util we find an existing key.
if (_tablet_meta->tablet_schema()->has_sequence_col()) {
if (schema->has_sequence_col()) {
continue;
}
// The key is deleted, we don't need to search for it any more.
Expand Down Expand Up @@ -649,8 +651,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}

RowsetSharedPtr rowset_find;
auto st = lookup_row_key(key, true, specified_rowsets, &loc, dummy_version.first - 1,
segment_caches, &rowset_find);
auto st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc,
dummy_version.first - 1, segment_caches, &rowset_find);
bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || st.is<KEY_ALREADY_EXISTS>();
// It's a defensive DCHECK, we need to exclude some common errors to avoid core-dump
// while stress test
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class BaseTablet {
// Lookup the row location of `encoded_key`, the function sets `row_location` on success.
// NOTE: the method only works in unique key model with primary key index, you will got a
// not supported error in other data model.
Status lookup_row_key(const Slice& encoded_key, bool with_seq_col,
Status lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, bool with_seq_col,
const std::vector<RowsetSharedPtr>& specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
Expand Down
30 changes: 16 additions & 14 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,14 +745,14 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column,
return Status::OK();
}

Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool with_rowid,
RowLocation* row_location) {
Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema,
bool with_seq_col, bool with_rowid, RowLocation* row_location) {
RETURN_IF_ERROR(load_pk_index_and_bf());
bool has_seq_col = _tablet_schema->has_sequence_col();
bool has_rowid = !_tablet_schema->cluster_key_idxes().empty();
bool has_seq_col = latest_schema->has_sequence_col();
bool has_rowid = !latest_schema->cluster_key_idxes().empty();
size_t seq_col_length = 0;
if (has_seq_col) {
seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
seq_col_length = latest_schema->column(latest_schema->sequence_col_idx()).length() + 1;
}
size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0;

Expand Down Expand Up @@ -788,16 +788,20 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool with_ro

Slice sought_key = Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);

// user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..." to add a hidden sequence column
// for a merge-on-write table which doesn't have sequence column, so `has_seq_col == true` doesn't mean
// data in segment has sequence column value
bool segment_has_seq_col = _tablet_schema->has_sequence_col();
Slice sought_key_without_seq = Slice(
sought_key.get_data(),
sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) - rowid_length);
if (has_seq_col) {
Slice sought_key_without_seq =
Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length - rowid_length);

// compare key
if (key_without_seq.compare(sought_key_without_seq) != 0) {
return Status::Error<ErrorCode::KEY_NOT_FOUND>("Can't find key in the segment");
}

if (with_seq_col) {
if (with_seq_col && segment_has_seq_col) {
// compare sequence id
Slice sequence_id =
Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1);
Expand All @@ -819,11 +823,9 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool with_ro
}
// found the key, use rowid in pk index if necessary.
if (has_rowid) {
Slice sought_key_without_seq =
Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length - rowid_length);
Slice rowid_slice = Slice(
sought_key.get_data() + sought_key_without_seq.get_size() + seq_col_length + 1,
rowid_length - 1);
Slice rowid_slice = Slice(sought_key.get_data() + sought_key_without_seq.get_size() +
(segment_has_seq_col ? seq_col_length : 0) + 1,
rowid_length - 1);
const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
const auto* rowid_coder = get_key_coder(type_info->type());
RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length,
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
return _pk_index_reader.get();
}

Status lookup_row_key(const Slice& key, bool with_seq_col, bool with_rowid,
RowLocation* row_location);
Status lookup_row_key(const Slice& key, const TabletSchema* latest_schema, bool with_seq_col,
bool with_rowid, RowLocation* row_location);

Status read_key_by_rowid(uint32_t row_id, std::string* key);

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column,
specified_rowsets, &loc, _mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,9 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column,
specified_rowsets, &loc, _mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
Expand Down
6 changes: 3 additions & 3 deletions be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,9 @@ Status PointQueryExecutor::_lookup_row_key() {
}
// Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr
auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, false, specified_rowsets,
&location, INT32_MAX /*rethink?*/, segment_caches,
rowset_ptr.get(), false));
st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false,
specified_rowsets, &location, INT32_MAX /*rethink?*/,
segment_caches, rowset_ptr.get(), false));
if (st.is<ErrorCode::KEY_NOT_FOUND>()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
111 aaa bbb 11
222 bbb bbb 11
333 ccc ddd 11

-- !sql --
111 aaa bbb 11 \N 0 2
222 bbb bbb 11 \N 0 3
333 ccc ddd 11 \N 0 4

-- !sql --
111 zzz yyy 100 99 0 5
222 xxx www 400 99 0 8
333 ccc ddd 11 \N 0 4

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_mow_enable_sequence_col") {

def tableName = "test_mow_enable_sequence_col"
sql """ DROP TABLE IF EXISTS ${tableName} force;"""
sql """CREATE TABLE IF NOT EXISTS ${tableName}
(`user_id` BIGINT NOT NULL,
`username` VARCHAR(50) NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT)
UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"disable_auto_compaction" = true,
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true");"""

sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) VALUES(111,'aaa','bbb',11);"""
sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) VALUES(222,'bbb','bbb',11);"""
sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) VALUES(333,'ccc','ddd',11);"""
order_qt_sql "select * from ${tableName};"

sql "set show_hidden_columns = true;"
sql "sync;"
def res = sql "desc ${tableName} all;"
assertTrue(!res.toString().contains("__DORIS_SEQUENCE_COL__"))
sql "set show_hidden_columns = false;"
sql "sync;"

def doSchemaChange = { cmd ->
sql cmd
waitForSchemaChangeDone {
sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1"""
time 2000
}
}
doSchemaChange """ALTER TABLE ${tableName} ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "bigint");"""

sql "set show_hidden_columns = true;"
sql "sync;"
res = sql "desc ${tableName} all;"
assertTrue(res.toString().contains("__DORIS_SEQUENCE_COL__"))
order_qt_sql "select * from ${tableName};"
sql "set show_hidden_columns = false;"
sql "sync;"

sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(111,'zzz','yyy',100,99);"""
sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(111,'hhh','mmm',200,88);"""
sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(222,'qqq','ppp',300,77);"""
sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(222,'xxx','www',400,99);"""

sql "set show_hidden_columns = true;"
sql "sync;"
order_qt_sql "select * from ${tableName};"
}
Loading