From 6d95aa2a081affb3a7e8fcd7e3b2b6d2529b42aa Mon Sep 17 00:00:00 2001 From: amorynan Date: Sat, 2 Sep 2023 09:43:44 +0800 Subject: [PATCH 1/6] fix array map batch append data with right next_array_item_rowid --- .../olap/rowset/segment_v2/column_writer.cpp | 80 ++++++++++++++++++- be/src/olap/rowset/segment_v2/column_writer.h | 11 ++- 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index ec2baa10f0bb8d..c4b3aa2398cf21 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -530,7 +530,12 @@ Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { remaining -= num_written; if (_page_builder->is_page_full()) { - RETURN_IF_ERROR(finish_current_page()); + // get next data for next array_item_rowid + if (remaining == 0) { + RETURN_IF_ERROR(finish_current_page()); + } else { + RETURN_IF_ERROR(finish_current_page(*ptr)); + } } } return Status::OK(); @@ -729,6 +734,65 @@ Status ScalarColumnWriter::finish_current_page() { return Status::OK(); } +Status ScalarColumnWriter::finish_current_page(const uint8_t* next_data_ptr) { + if (_next_rowid == _first_rowid) { + return Status::OK(); + } + if (_opts.need_zone_map) { + if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) { + _zone_map_index_builder->reset_page_zone_map(); + } + RETURN_IF_ERROR(_zone_map_index_builder->flush()); + } + + if (_opts.need_bloom_filter) { + RETURN_IF_ERROR(_bloom_filter_index_builder->flush()); + } + + // build data page body : encoded values + [nullmap] + std::vector body; + OwnedSlice encoded_values = _page_builder->finish(); + _page_builder->reset(); + body.push_back(encoded_values.slice()); + + OwnedSlice nullmap; + if (_null_bitmap_builder != nullptr) { + if (is_nullable() && _null_bitmap_builder->has_null()) { + nullmap = _null_bitmap_builder->finish(); + body.push_back(nullmap.slice()); + } + _null_bitmap_builder->reset(); + } + + // prepare data page footer + std::unique_ptr page(new Page()); + page->footer.set_type(DATA_PAGE); + page->footer.set_uncompressed_size(Slice::compute_total_size(body)); + auto data_page_footer = page->footer.mutable_data_page_footer(); + data_page_footer->set_first_ordinal(_first_rowid); + data_page_footer->set_num_values(_next_rowid - _first_rowid); + data_page_footer->set_nullmap_size(nullmap.slice().size); + if (_new_page_callback != nullptr) { + _new_page_callback->put_extra_info_in_page(data_page_footer, next_data_ptr); + } + // trying to compress page body + OwnedSlice compressed_body; + RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving, + body, &compressed_body)); + if (compressed_body.slice().empty()) { + // page body is uncompressed + page->data.emplace_back(std::move(encoded_values)); + page->data.emplace_back(std::move(nullmap)); + } else { + // page body is compressed + page->data.emplace_back(std::move(compressed_body)); + } + + _push_back_page(page.release()); + _first_rowid = _next_rowid; + return Status::OK(); +} + //////////////////////////////////////////////////////////////////////////////// StructColumnWriter::StructColumnWriter( @@ -890,6 +954,13 @@ Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { return Status::OK(); } +Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer, + const uint8_t* next_data_ptr) { + auto offset = *(const uint64_t*)next_data_ptr; + footer->set_next_array_item_ordinal(offset); + return Status::OK(); +} + Status ArrayColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); @@ -1144,6 +1215,13 @@ Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { return Status::OK(); } +Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer, + const uint8_t* next_data_ptr) { + auto offset = *(const uint64_t*)next_data_ptr; + footer->set_next_array_item_ordinal(offset); + return Status::OK(); +} + Status MapColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index b5aabd4e3ab871..b3847f0f4839dc 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -165,6 +165,9 @@ class FlushPageCallback { public: virtual ~FlushPageCallback() = default; virtual Status put_extra_info_in_page(DataPageFooterPB* footer) { return Status::OK(); } + virtual Status put_extra_info_in_page(DataPageFooterPB* footer, const uint8_t* next_data_ptr) { + return Status::OK(); + } }; // Encode one column's data into some memory slice. @@ -184,6 +187,10 @@ class ScalarColumnWriter final : public ColumnWriter { Status finish_current_page() override; + // this method is used for pass next data when current page is full and next data need in extra + // info + Status finish_current_page(const uint8_t* next_data_ptr); + uint64_t estimate_buffer_size() override; // finish append data @@ -374,6 +381,8 @@ class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { private: Status put_extra_info_in_page(DataPageFooterPB* header) override; + + Status put_extra_info_in_page(DataPageFooterPB* header, const uint8_t* next_data_ptr) override; Status write_null_column(size_t num_rows, bool is_null); // 写入num_rows个null标记 bool has_empty_items() const { return _item_writer->get_next_rowid() == 0; } @@ -433,7 +442,7 @@ class MapColumnWriter final : public ColumnWriter, public FlushPageCallback { private: Status put_extra_info_in_page(DataPageFooterPB* header) override; - + Status put_extra_info_in_page(DataPageFooterPB* header, const uint8_t* next_data_ptr) override; std::vector> _kv_writers; // we need null writer to make sure a row is null or not std::unique_ptr _null_writer; From b41554ea360755071d1188aa43e958f182e5e01a Mon Sep 17 00:00:00 2001 From: amorynan Date: Mon, 4 Sep 2023 12:26:23 +0800 Subject: [PATCH 2/6] add offsets column writer --- .../olap/rowset/segment_v2/column_writer.cpp | 112 ++++++------------ be/src/olap/rowset/segment_v2/column_writer.h | 35 +++--- 2 files changed, 57 insertions(+), 90 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index c4b3aa2398cf21..aaa2dd2ce334db 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -221,7 +221,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* length_column.set_index_length(-1); // no short key index std::unique_ptr bigint_field(FieldFactory::create(length_column)); auto* length_writer = - new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer); + new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer); // if nullable, create null writer ScalarColumnWriter* null_writer = nullptr; @@ -314,7 +314,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* length_column.set_index_length(-1); // no short key index std::unique_ptr bigint_field(FieldFactory::create(length_column)); auto* length_writer = - new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer); + new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer); // create null writer if (opts.meta->is_nullable()) { @@ -530,12 +530,7 @@ Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { remaining -= num_written; if (_page_builder->is_page_full()) { - // get next data for next array_item_rowid - if (remaining == 0) { - RETURN_IF_ERROR(finish_current_page()); - } else { - RETURN_IF_ERROR(finish_current_page(*ptr)); - } + RETURN_IF_ERROR(finish_current_page()); } } return Status::OK(); @@ -734,66 +729,45 @@ Status ScalarColumnWriter::finish_current_page() { return Status::OK(); } -Status ScalarColumnWriter::finish_current_page(const uint8_t* next_data_ptr) { - if (_next_rowid == _first_rowid) { - return Status::OK(); - } - if (_opts.need_zone_map) { - if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) { - _zone_map_index_builder->reset_page_zone_map(); - } - RETURN_IF_ERROR(_zone_map_index_builder->flush()); - } +//////////////////////////////////////////////////////////////////////////////// - if (_opts.need_bloom_filter) { - RETURN_IF_ERROR(_bloom_filter_index_builder->flush()); - } +//////////////////////////////////////////////////////////////////////////////// +// offset column writer +//////////////////////////////////////////////////////////////////////////////// - // build data page body : encoded values + [nullmap] - std::vector body; - OwnedSlice encoded_values = _page_builder->finish(); - _page_builder->reset(); - body.push_back(encoded_values.slice()); +OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts, + std::unique_ptr field, io::FileWriter* file_writer) + : ScalarColumnWriter(opts, std::move(field), file_writer) {} - OwnedSlice nullmap; - if (_null_bitmap_builder != nullptr) { - if (is_nullable() && _null_bitmap_builder->has_null()) { - nullmap = _null_bitmap_builder->finish(); - body.push_back(nullmap.slice()); - } - _null_bitmap_builder->reset(); - } +OffsetColumnWriter::~OffsetColumnWriter() = default; - // prepare data page footer - std::unique_ptr page(new Page()); - page->footer.set_type(DATA_PAGE); - page->footer.set_uncompressed_size(Slice::compute_total_size(body)); - auto data_page_footer = page->footer.mutable_data_page_footer(); - data_page_footer->set_first_ordinal(_first_rowid); - data_page_footer->set_num_values(_next_rowid - _first_rowid); - data_page_footer->set_nullmap_size(nullmap.slice().size); - if (_new_page_callback != nullptr) { - _new_page_callback->put_extra_info_in_page(data_page_footer, next_data_ptr); - } - // trying to compress page body - OwnedSlice compressed_body; - RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving, - body, &compressed_body)); - if (compressed_body.slice().empty()) { - // page body is uncompressed - page->data.emplace_back(std::move(encoded_values)); - page->data.emplace_back(std::move(nullmap)); - } else { - // page body is compressed - page->data.emplace_back(std::move(compressed_body)); - } +Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { + size_t remaining = num_rows; + while (remaining > 0) { + size_t num_written = remaining; + RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written)); - _push_back_page(page.release()); - _first_rowid = _next_rowid; + remaining -= num_written; + + if (_page_builder->is_page_full()) { + // get next data for next array_item_rowid + if (remaining == 0) { + RETURN_IF_ERROR(finish_current_page()); + } else { + RETURN_IF_ERROR(finish_current_page_with_next_data(*ptr)); + } + } + } return Status::OK(); } -//////////////////////////////////////////////////////////////////////////////// +Status OffsetColumnWriter::finish_current_page_with_next_data(const uint8_t* next_data_ptr) { + finish_current_page(); + DataPageFooterPB* footer = get_data_pages().tail->footer.mutable_data_page_footer(); + auto offset = *(const uint64_t*)next_data_ptr; + footer->set_next_array_item_ordinal(offset); + return Status::OK(); +} StructColumnWriter::StructColumnWriter( const ColumnWriterOptions& opts, std::unique_ptr field, @@ -917,7 +891,7 @@ Status StructColumnWriter::finish_current_page() { //////////////////////////////////////////////////////////////////////////////// ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, - ScalarColumnWriter* offset_writer, + OffsetColumnWriter* offset_writer, ScalarColumnWriter* null_writer, std::unique_ptr item_writer) : ColumnWriter(std::move(field), opts.meta->is_nullable()), @@ -954,13 +928,6 @@ Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { return Status::OK(); } -Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer, - const uint8_t* next_data_ptr) { - auto offset = *(const uint64_t*)next_data_ptr; - footer->set_next_array_item_ordinal(offset); - return Status::OK(); -} - Status ArrayColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); @@ -1079,7 +1046,7 @@ Status ArrayColumnWriter::finish_current_page() { /// ============================= MapColumnWriter =====================//// MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, - ScalarColumnWriter* null_writer, ScalarColumnWriter* offset_writer, + ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer, std::vector>& kv_writers) : ColumnWriter(std::move(field), opts.meta->is_nullable()), _opts(opts) { CHECK_EQ(kv_writers.size(), 2); @@ -1215,13 +1182,6 @@ Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { return Status::OK(); } -Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer, - const uint8_t* next_data_ptr) { - auto offset = *(const uint64_t*)next_data_ptr; - footer->set_next_array_item_ordinal(offset); - return Status::OK(); -} - Status MapColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index b3847f0f4839dc..0056179cd39dca 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -165,16 +165,13 @@ class FlushPageCallback { public: virtual ~FlushPageCallback() = default; virtual Status put_extra_info_in_page(DataPageFooterPB* footer) { return Status::OK(); } - virtual Status put_extra_info_in_page(DataPageFooterPB* footer, const uint8_t* next_data_ptr) { - return Status::OK(); - } }; // Encode one column's data into some memory slice. // Because some columns would be stored in a file, we should wait // until all columns has been finished, and then data can be written // to file -class ScalarColumnWriter final : public ColumnWriter { +class ScalarColumnWriter : public ColumnWriter { public: ScalarColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, io::FileWriter* file_writer); @@ -187,10 +184,6 @@ class ScalarColumnWriter final : public ColumnWriter { Status finish_current_page() override; - // this method is used for pass next data when current page is full and next data need in extra - // info - Status finish_current_page(const uint8_t* next_data_ptr); - uint64_t estimate_buffer_size() override; // finish append data @@ -215,6 +208,7 @@ class ScalarColumnWriter final : public ColumnWriter { Status append_data_in_current_page(const uint8_t* ptr, size_t* num_written); friend class ArrayColumnWriter; + friend class OffsetColumnWriter; private: std::unique_ptr _page_builder; @@ -261,6 +255,7 @@ class ScalarColumnWriter final : public ColumnWriter { } Status _write_data_page(Page* page); + PageHead get_data_pages() { return _pages; } private: io::FileWriter* _file_writer = nullptr; @@ -283,6 +278,20 @@ class ScalarColumnWriter final : public ColumnWriter { FlushPageCallback* _new_page_callback = nullptr; }; +class OffsetColumnWriter final : public ScalarColumnWriter { +public: + OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, + io::FileWriter* file_writer); + + ~OffsetColumnWriter() override; + + // this method is used for pass next data when current page is full and next data need in extra + // info + Status finish_current_page_with_next_data(const uint8_t* next_data_ptr); + + Status append_data(const uint8_t** ptr, size_t num_rows) override; +}; + class StructColumnWriter final : public ColumnWriter { public: explicit StructColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, @@ -338,7 +347,7 @@ class StructColumnWriter final : public ColumnWriter { class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { public: explicit ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, - ScalarColumnWriter* offset_writer, ScalarColumnWriter* null_writer, + OffsetColumnWriter* offset_writer, ScalarColumnWriter* null_writer, std::unique_ptr item_writer); ~ArrayColumnWriter() override = default; @@ -382,12 +391,11 @@ class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { private: Status put_extra_info_in_page(DataPageFooterPB* header) override; - Status put_extra_info_in_page(DataPageFooterPB* header, const uint8_t* next_data_ptr) override; Status write_null_column(size_t num_rows, bool is_null); // 写入num_rows个null标记 bool has_empty_items() const { return _item_writer->get_next_rowid() == 0; } private: - std::unique_ptr _offset_writer; + std::unique_ptr _offset_writer; std::unique_ptr _null_writer; std::unique_ptr _item_writer; std::unique_ptr _inverted_index_builder; @@ -397,7 +405,7 @@ class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { class MapColumnWriter final : public ColumnWriter, public FlushPageCallback { public: explicit MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, - ScalarColumnWriter* null_writer, ScalarColumnWriter* offsets_writer, + ScalarColumnWriter* null_writer, OffsetColumnWriter* offsets_writer, std::vector>& _kv_writers); ~MapColumnWriter() override = default; @@ -442,11 +450,10 @@ class MapColumnWriter final : public ColumnWriter, public FlushPageCallback { private: Status put_extra_info_in_page(DataPageFooterPB* header) override; - Status put_extra_info_in_page(DataPageFooterPB* header, const uint8_t* next_data_ptr) override; std::vector> _kv_writers; // we need null writer to make sure a row is null or not std::unique_ptr _null_writer; - std::unique_ptr _offsets_writer; + std::unique_ptr _offsets_writer; std::unique_ptr _inverted_index_builder; ColumnWriterOptions _opts; }; From ed72071240d4655127b2bf59dbbb81b807e505e0 Mon Sep 17 00:00:00 2001 From: amorynan Date: Mon, 4 Sep 2023 14:41:47 +0800 Subject: [PATCH 3/6] add return status --- be/src/olap/rowset/segment_v2/column_writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index aaa2dd2ce334db..376dd62f1b8814 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -762,7 +762,7 @@ Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { } Status OffsetColumnWriter::finish_current_page_with_next_data(const uint8_t* next_data_ptr) { - finish_current_page(); + RETURN_IF_ERROR(finish_current_page()); DataPageFooterPB* footer = get_data_pages().tail->footer.mutable_data_page_footer(); auto offset = *(const uint64_t*)next_data_ptr; footer->set_next_array_item_ordinal(offset); From b5739cad04b94926abf9306aab2a8d67924681fc Mon Sep 17 00:00:00 2001 From: amorynan Date: Mon, 4 Sep 2023 20:42:09 +0800 Subject: [PATCH 4/6] update --- .../olap/rowset/segment_v2/column_writer.cpp | 40 +++++++------------ be/src/olap/rowset/segment_v2/column_writer.h | 22 +++++----- be/src/vec/olap/olap_data_convertor.cpp | 4 +- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 376dd62f1b8814..ae937ae5417616 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -737,35 +737,38 @@ Status ScalarColumnWriter::finish_current_page() { OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, io::FileWriter* file_writer) - : ScalarColumnWriter(opts, std::move(field), file_writer) {} + : ScalarColumnWriter(opts, std::move(field), file_writer) { + // now we only explain data in offset column as uint64 + DCHECK(field->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT); +} OffsetColumnWriter::~OffsetColumnWriter() = default; +Status OffsetColumnWriter::init() { + RETURN_IF_ERROR(ScalarColumnWriter::init()); + register_flush_page_callback(this); + _next_offset = 0; + return Status::OK(); +} + Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { size_t remaining = num_rows; while (remaining > 0) { size_t num_written = remaining; RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written)); - + _next_offset = *(const uint64_t*)(*ptr); remaining -= num_written; if (_page_builder->is_page_full()) { // get next data for next array_item_rowid - if (remaining == 0) { - RETURN_IF_ERROR(finish_current_page()); - } else { - RETURN_IF_ERROR(finish_current_page_with_next_data(*ptr)); - } + RETURN_IF_ERROR(finish_current_page()); } } return Status::OK(); } -Status OffsetColumnWriter::finish_current_page_with_next_data(const uint8_t* next_data_ptr) { - RETURN_IF_ERROR(finish_current_page()); - DataPageFooterPB* footer = get_data_pages().tail->footer.mutable_data_page_footer(); - auto offset = *(const uint64_t*)next_data_ptr; - footer->set_next_array_item_ordinal(offset); +Status OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { + footer->set_next_array_item_ordinal(_next_offset); return Status::OK(); } @@ -909,7 +912,6 @@ Status ArrayColumnWriter::init() { RETURN_IF_ERROR(_null_writer->init()); } RETURN_IF_ERROR(_item_writer->init()); - _offset_writer->register_flush_page_callback(this); if (_opts.inverted_index) { auto writer = dynamic_cast(_item_writer.get()); if (writer != nullptr) { @@ -923,11 +925,6 @@ Status ArrayColumnWriter::init() { return Status::OK(); } -Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { - footer->set_next_array_item_ordinal(_item_writer->get_next_rowid()); - return Status::OK(); -} - Status ArrayColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); @@ -1066,7 +1063,6 @@ Status MapColumnWriter::init() { } // here register_flush_page_callback to call this.put_extra_info_in_page() // when finish cur data page - _offsets_writer->register_flush_page_callback(this); for (auto& sub_writer : _kv_writers) { RETURN_IF_ERROR(sub_writer->init()); } @@ -1176,12 +1172,6 @@ Status MapColumnWriter::finish_current_page() { return Status::NotSupported("map writer has no data, can not finish_current_page"); } -// write this value for column reader to read according offsets -Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { - footer->set_next_array_item_ordinal(_kv_writers[0]->get_next_rowid()); - return Status::OK(); -} - Status MapColumnWriter::write_inverted_index() { if (_opts.inverted_index) { return _inverted_index_builder->finish(); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 0056179cd39dca..1bc0afb972b245 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -255,7 +255,6 @@ class ScalarColumnWriter : public ColumnWriter { } Status _write_data_page(Page* page); - PageHead get_data_pages() { return _pages; } private: io::FileWriter* _file_writer = nullptr; @@ -278,18 +277,24 @@ class ScalarColumnWriter : public ColumnWriter { FlushPageCallback* _new_page_callback = nullptr; }; -class OffsetColumnWriter final : public ScalarColumnWriter { +// offsetColumnWriter is used column which has offset column, like array, map. +// column type is only uint64 and should response for whole column value [start, end], end will set +// in footer.next_array_item_ordinal which in finish_cur_page() callback put_extra_info_in_page() +class OffsetColumnWriter final : public ScalarColumnWriter, FlushPageCallback { public: OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, io::FileWriter* file_writer); ~OffsetColumnWriter() override; - // this method is used for pass next data when current page is full and next data need in extra - // info - Status finish_current_page_with_next_data(const uint8_t* next_data_ptr); + Status init() override; Status append_data(const uint8_t** ptr, size_t num_rows) override; + +private: + Status put_extra_info_in_page(DataPageFooterPB* footer) override; + + uint64_t _next_offset; }; class StructColumnWriter final : public ColumnWriter { @@ -344,7 +349,7 @@ class StructColumnWriter final : public ColumnWriter { ColumnWriterOptions _opts; }; -class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { +class ArrayColumnWriter final : public ColumnWriter { public: explicit ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, OffsetColumnWriter* offset_writer, ScalarColumnWriter* null_writer, @@ -389,8 +394,6 @@ class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { ordinal_t get_next_rowid() const override { return _offset_writer->get_next_rowid(); } private: - Status put_extra_info_in_page(DataPageFooterPB* header) override; - Status write_null_column(size_t num_rows, bool is_null); // 写入num_rows个null标记 bool has_empty_items() const { return _item_writer->get_next_rowid() == 0; } @@ -402,7 +405,7 @@ class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback { ColumnWriterOptions _opts; }; -class MapColumnWriter final : public ColumnWriter, public FlushPageCallback { +class MapColumnWriter final : public ColumnWriter { public: explicit MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, ScalarColumnWriter* null_writer, OffsetColumnWriter* offsets_writer, @@ -449,7 +452,6 @@ class MapColumnWriter final : public ColumnWriter, public FlushPageCallback { ordinal_t get_next_rowid() const override { return _offsets_writer->get_next_rowid(); } private: - Status put_extra_info_in_page(DataPageFooterPB* header) override; std::vector> _kv_writers; // we need null writer to make sure a row is null or not std::unique_ptr _null_writer; diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 181f1cd477212f..8dc14529701100 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -1011,8 +1011,8 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap( auto elem_size = end_offset - start_offset; _offsets.clear(); - _offsets.reserve(_num_rows); - for (int i = 0; i < _num_rows; ++i) { + _offsets.reserve(_num_rows + 1); + for (int i = 0; i <= _num_rows; ++i) { _offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset + _base_offset); } _base_offset += elem_size; From 272bfe6eefb7b25c92e9f67202db7f9aba9192ce Mon Sep 17 00:00:00 2001 From: amorynan Date: Mon, 4 Sep 2023 21:33:54 +0800 Subject: [PATCH 5/6] add some note --- be/src/olap/rowset/segment_v2/column_writer.cpp | 1 + be/src/vec/olap/olap_data_convertor.cpp | 2 ++ 2 files changed, 3 insertions(+) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index ae937ae5417616..019a552477ee17 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -756,6 +756,7 @@ Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { while (remaining > 0) { size_t num_written = remaining; RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written)); + // _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal _next_offset = *(const uint64_t*)(*ptr); remaining -= num_written; diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 8dc14529701100..e7b59033c48de9 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -943,6 +943,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap( auto elem_size = end_offset - start_offset; _offsets.clear(); + // we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column _offsets.reserve(_num_rows + 1); for (int i = 0; i <= _num_rows; ++i) { _offsets.push_back(column_array->offset_at(i + _row_pos) - start_offset + _base_offset); @@ -1011,6 +1012,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap( auto elem_size = end_offset - start_offset; _offsets.clear(); + // we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column _offsets.reserve(_num_rows + 1); for (int i = 0; i <= _num_rows; ++i) { _offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset + _base_offset); From fb0092c1ebb2fb5ab2fb123e36976c4df399520e Mon Sep 17 00:00:00 2001 From: amorynan Date: Tue, 5 Sep 2023 17:32:48 +0800 Subject: [PATCH 6/6] fix get field type --- be/src/olap/rowset/segment_v2/column_writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 019a552477ee17..7a446f112352d5 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -739,7 +739,7 @@ OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr field, io::FileWriter* file_writer) : ScalarColumnWriter(opts, std::move(field), file_writer) { // now we only explain data in offset column as uint64 - DCHECK(field->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT); + DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT); } OffsetColumnWriter::~OffsetColumnWriter() = default;