From 8207b0d452639081d207d750f49442c8d8872c33 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 27 Aug 2024 11:27:17 +0800 Subject: [PATCH] fix --- be/src/olap/base_tablet.cpp | 26 +++---- be/src/olap/base_tablet.h | 2 +- be/src/olap/rowset/segment_v2/segment.cpp | 30 ++++---- be/src/olap/rowset/segment_v2/segment.h | 4 +- .../olap/rowset/segment_v2/segment_writer.cpp | 5 +- .../segment_v2/vertical_segment_writer.cpp | 5 +- be/src/service/point_query_executor.cpp | 6 +- .../test_mow_enable_sequence_col.out | 16 +++++ .../test_mow_enable_sequence_col.groovy | 72 +++++++++++++++++++ 9 files changed, 130 insertions(+), 36 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out create mode 100644 regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 934b00f56698b8..7df39d8e881381 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -427,21 +427,22 @@ Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation& return Status::OK(); } -Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col, +Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, + bool with_seq_col, const std::vector& specified_rowsets, RowLocation* row_location, uint32_t version, std::vector>& segment_caches, RowsetSharedPtr* rowset, bool with_rowid) { SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency); size_t seq_col_length = 0; - if (_tablet_meta->tablet_schema()->has_sequence_col() && with_seq_col) { - seq_col_length = _tablet_meta->tablet_schema() - ->column(_tablet_meta->tablet_schema()->sequence_col_idx()) - .length() + - 1; + // use the latest tablet schema to decide if the tablet has sequence column currently + const TabletSchema* schema = + (latest_schema == nullptr ? _tablet_meta->tablet_schema().get() : latest_schema); + if (schema->has_sequence_col() && with_seq_col) { + seq_col_length = schema->column(schema->sequence_col_idx()).length() + 1; } size_t rowid_length = 0; - if (with_rowid && !_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) { + if (with_rowid && !schema->cluster_key_idxes().empty()) { rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; } Slice key_without_seq = @@ -457,7 +458,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col, for (int i = num_segments - 1; i >= 0; i--) { // If mow table has cluster keys, the key bounds is short keys, not primary keys // use PrimaryKeyIndexMetaPB in primary key index? - if (_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) { + if (schema->cluster_key_idxes().empty()) { if (key_without_seq.compare(segments_key_bounds[i].max_key()) > 0 || key_without_seq.compare(segments_key_bounds[i].min_key()) < 0) { continue; @@ -478,7 +479,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col, DCHECK_EQ(segments.size(), num_segments); for (auto id : picked_segments) { - Status s = segments[id]->lookup_row_key(encoded_key, with_seq_col, with_rowid, &loc); + Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid, + &loc); if (s.is()) { continue; } @@ -489,7 +491,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col, {loc.rowset_id, loc.segment_id, version}, loc.row_id)) { // if has sequence col, we continue to compare the sequence_id of // all rowsets, util we find an existing key. - if (_tablet_meta->tablet_schema()->has_sequence_col()) { + if (schema->has_sequence_col()) { continue; } // The key is deleted, we don't need to search for it any more. @@ -649,8 +651,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, } RowsetSharedPtr rowset_find; - auto st = lookup_row_key(key, true, specified_rowsets, &loc, dummy_version.first - 1, - segment_caches, &rowset_find); + auto st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc, + dummy_version.first - 1, segment_caches, &rowset_find); bool expected_st = st.ok() || st.is() || st.is(); // It's a defensive DCHECK, we need to exclude some common errors to avoid core-dump // while stress test diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index ab289822df891f..cfaf536902e03e 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -145,7 +145,7 @@ class BaseTablet { // Lookup the row location of `encoded_key`, the function sets `row_location` on success. // NOTE: the method only works in unique key model with primary key index, you will got a // not supported error in other data model. - Status lookup_row_key(const Slice& encoded_key, bool with_seq_col, + Status lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, bool with_seq_col, const std::vector& specified_rowsets, RowLocation* row_location, uint32_t version, std::vector>& segment_caches, diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 2666fc8b633e1a..68a186205b3ce3 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -745,14 +745,14 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column, return Status::OK(); } -Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool with_rowid, - RowLocation* row_location) { +Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema, + bool with_seq_col, bool with_rowid, RowLocation* row_location) { RETURN_IF_ERROR(load_pk_index_and_bf()); - bool has_seq_col = _tablet_schema->has_sequence_col(); - bool has_rowid = !_tablet_schema->cluster_key_idxes().empty(); + bool has_seq_col = latest_schema->has_sequence_col(); + bool has_rowid = !latest_schema->cluster_key_idxes().empty(); size_t seq_col_length = 0; if (has_seq_col) { - seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1; + seq_col_length = latest_schema->column(latest_schema->sequence_col_idx()).length() + 1; } size_t rowid_length = has_rowid ? PrimaryKeyIndexReader::ROW_ID_LENGTH : 0; @@ -788,16 +788,20 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool with_ro Slice sought_key = Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size); + // user may use "ALTER TABLE tbl ENABLE FEATURE "SEQUENCE_LOAD" WITH ..." to add a hidden sequence column + // for a merge-on-write table which doesn't have sequence column, so `has_seq_col == true` doesn't mean + // data in segment has sequence column value + bool segment_has_seq_col = _tablet_schema->has_sequence_col(); + Slice sought_key_without_seq = Slice( + sought_key.get_data(), + sought_key.get_size() - (segment_has_seq_col ? seq_col_length : 0) - rowid_length); if (has_seq_col) { - Slice sought_key_without_seq = - Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length - rowid_length); - // compare key if (key_without_seq.compare(sought_key_without_seq) != 0) { return Status::Error("Can't find key in the segment"); } - if (with_seq_col) { + if (with_seq_col && segment_has_seq_col) { // compare sequence id Slice sequence_id = Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1); @@ -819,11 +823,9 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool with_ro } // found the key, use rowid in pk index if necessary. if (has_rowid) { - Slice sought_key_without_seq = - Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length - rowid_length); - Slice rowid_slice = Slice( - sought_key.get_data() + sought_key_without_seq.get_size() + seq_col_length + 1, - rowid_length - 1); + Slice rowid_slice = Slice(sought_key.get_data() + sought_key_without_seq.get_size() + + (segment_has_seq_col ? seq_col_length : 0) + 1, + rowid_length - 1); const auto* type_info = get_scalar_type_info(); const auto* rowid_coder = get_key_coder(type_info->type()); RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length, diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index dd61e7eb831207..46484ce919cf7c 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -128,8 +128,8 @@ class Segment : public std::enable_shared_from_this { return _pk_index_reader.get(); } - Status lookup_row_key(const Slice& key, bool with_seq_col, bool with_rowid, - RowLocation* row_location); + Status lookup_row_key(const Slice& key, const TabletSchema* latest_schema, bool with_seq_col, + bool with_rowid, RowLocation* row_location); Status read_key_by_rowid(uint32_t row_id, std::string* key); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 2c94942bac08c7..105433d2689e01 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -584,8 +584,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; - auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc, - _mow_context->max_version, segment_caches, &rowset); + auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column, + specified_rowsets, &loc, _mow_context->max_version, + segment_caches, &rowset); if (st.is()) { if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { ++num_rows_filtered; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 891fd8c6a10ce6..64f72bc0c4669d 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -444,8 +444,9 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; - auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc, - _mow_context->max_version, segment_caches, &rowset); + auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column, + specified_rowsets, &loc, _mow_context->max_version, + segment_caches, &rowset); if (st.is()) { if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { ++num_rows_filtered; diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index da79726f389829..88293ba9b03675 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -383,9 +383,9 @@ Status PointQueryExecutor::_lookup_row_key() { } // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr auto rowset_ptr = std::make_unique(); - st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, false, specified_rowsets, - &location, INT32_MAX /*rethink?*/, segment_caches, - rowset_ptr.get(), false)); + st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false, + specified_rowsets, &location, INT32_MAX /*rethink?*/, + segment_caches, rowset_ptr.get(), false)); if (st.is()) { continue; } diff --git a/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out b/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out new file mode 100644 index 00000000000000..d99510cfbcb80f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/test_mow_enable_sequence_col.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +111 aaa bbb 11 +222 bbb bbb 11 +333 ccc ddd 11 + +-- !sql -- +111 aaa bbb 11 \N 0 2 +222 bbb bbb 11 \N 0 3 +333 ccc ddd 11 \N 0 4 + +-- !sql -- +111 zzz yyy 100 99 0 5 +222 xxx www 400 99 0 8 +333 ccc ddd 11 \N 0 4 + diff --git a/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy b/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy new file mode 100644 index 00000000000000..2cfb8133fd659d --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/test_mow_enable_sequence_col.groovy @@ -0,0 +1,72 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_enable_sequence_col") { + + def tableName = "test_mow_enable_sequence_col" + sql """ DROP TABLE IF EXISTS ${tableName} force;""" + sql """CREATE TABLE IF NOT EXISTS ${tableName} + (`user_id` BIGINT NOT NULL, + `username` VARCHAR(50) NOT NULL, + `city` VARCHAR(20), + `age` SMALLINT) + UNIQUE KEY(`user_id`) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = true, + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true");""" + + sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) VALUES(111,'aaa','bbb',11);""" + sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) VALUES(222,'bbb','bbb',11);""" + sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`) VALUES(333,'ccc','ddd',11);""" + order_qt_sql "select * from ${tableName};" + + sql "set show_hidden_columns = true;" + sql "sync;" + def res = sql "desc ${tableName} all;" + assertTrue(!res.toString().contains("__DORIS_SEQUENCE_COL__")) + sql "set show_hidden_columns = false;" + sql "sync;" + + def doSchemaChange = { cmd -> + sql cmd + waitForSchemaChangeDone { + sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1""" + time 2000 + } + } + doSchemaChange """ALTER TABLE ${tableName} ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "bigint");""" + + sql "set show_hidden_columns = true;" + sql "sync;" + res = sql "desc ${tableName} all;" + assertTrue(res.toString().contains("__DORIS_SEQUENCE_COL__")) + order_qt_sql "select * from ${tableName};" + sql "set show_hidden_columns = false;" + sql "sync;" + + sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(111,'zzz','yyy',100,99);""" + sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(111,'hhh','mmm',200,88);""" + sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(222,'qqq','ppp',300,77);""" + sql """insert into ${tableName}(`user_id`,`username`,`city`,`age`, `__DORIS_SEQUENCE_COL__`) VALUES(222,'xxx','www',400,99);""" + + sql "set show_hidden_columns = true;" + sql "sync;" + order_qt_sql "select * from ${tableName};" +}