Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX](array/map)fix array map batch append data with right next_array_item_rowid #23779

Merged
merged 8 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
length_column.set_index_length(-1); // no short key index
std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
auto* length_writer =
new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer);
new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);

// if nullable, create null writer
ScalarColumnWriter* null_writer = nullptr;
Expand Down Expand Up @@ -314,7 +314,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
length_column.set_index_length(-1); // no short key index
std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
auto* length_writer =
new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer);
new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);

// create null writer
if (opts.meta->is_nullable()) {
Expand Down Expand Up @@ -731,6 +731,48 @@ Status ScalarColumnWriter::finish_current_page() {

////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////////////////////////////////////////////
// offset column writer
////////////////////////////////////////////////////////////////////////////////

OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field, io::FileWriter* file_writer)
: ScalarColumnWriter(opts, std::move(field), file_writer) {
// now we only explain data in offset column as uint64
DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
}

OffsetColumnWriter::~OffsetColumnWriter() = default;

Status OffsetColumnWriter::init() {
RETURN_IF_ERROR(ScalarColumnWriter::init());
register_flush_page_callback(this);
_next_offset = 0;
return Status::OK();
}

Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
size_t remaining = num_rows;
while (remaining > 0) {
size_t num_written = remaining;
RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
// _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal
_next_offset = *(const uint64_t*)(*ptr);
remaining -= num_written;

if (_page_builder->is_page_full()) {
// get next data for next array_item_rowid
RETURN_IF_ERROR(finish_current_page());
}
}
return Status::OK();
}

Status OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
footer->set_next_array_item_ordinal(_next_offset);
return Status::OK();
}

StructColumnWriter::StructColumnWriter(
const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer,
Expand Down Expand Up @@ -853,7 +895,7 @@ Status StructColumnWriter::finish_current_page() {
////////////////////////////////////////////////////////////////////////////////

ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* offset_writer,
OffsetColumnWriter* offset_writer,
ScalarColumnWriter* null_writer,
std::unique_ptr<ColumnWriter> item_writer)
: ColumnWriter(std::move(field), opts.meta->is_nullable()),
Expand All @@ -871,7 +913,6 @@ Status ArrayColumnWriter::init() {
RETURN_IF_ERROR(_null_writer->init());
}
RETURN_IF_ERROR(_item_writer->init());
_offset_writer->register_flush_page_callback(this);
if (_opts.inverted_index) {
auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
if (writer != nullptr) {
Expand All @@ -885,11 +926,6 @@ Status ArrayColumnWriter::init() {
return Status::OK();
}

Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
footer->set_next_array_item_ordinal(_item_writer->get_next_rowid());
return Status::OK();
}

Status ArrayColumnWriter::write_inverted_index() {
if (_opts.inverted_index) {
return _inverted_index_builder->finish();
Expand Down Expand Up @@ -1008,7 +1044,7 @@ Status ArrayColumnWriter::finish_current_page() {

/// ============================= MapColumnWriter =====================////
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer, ScalarColumnWriter* offset_writer,
ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer,
std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
: ColumnWriter(std::move(field), opts.meta->is_nullable()), _opts(opts) {
CHECK_EQ(kv_writers.size(), 2);
Expand All @@ -1028,7 +1064,6 @@ Status MapColumnWriter::init() {
}
// here register_flush_page_callback to call this.put_extra_info_in_page()
// when finish cur data page
_offsets_writer->register_flush_page_callback(this);
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->init());
}
Expand Down Expand Up @@ -1138,12 +1173,6 @@ Status MapColumnWriter::finish_current_page() {
return Status::NotSupported("map writer has no data, can not finish_current_page");
}

// write this value for column reader to read according offsets
Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
footer->set_next_array_item_ordinal(_kv_writers[0]->get_next_rowid());
return Status::OK();
}

Status MapColumnWriter::write_inverted_index() {
if (_opts.inverted_index) {
return _inverted_index_builder->finish();
Expand Down
38 changes: 28 additions & 10 deletions be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class FlushPageCallback {
// Because some columns would be stored in a file, we should wait
// until all columns has been finished, and then data can be written
// to file
class ScalarColumnWriter final : public ColumnWriter {
class ScalarColumnWriter : public ColumnWriter {
public:
ScalarColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
io::FileWriter* file_writer);
Expand Down Expand Up @@ -208,6 +208,7 @@ class ScalarColumnWriter final : public ColumnWriter {

Status append_data_in_current_page(const uint8_t* ptr, size_t* num_written);
friend class ArrayColumnWriter;
friend class OffsetColumnWriter;

private:
std::unique_ptr<PageBuilder> _page_builder;
Expand Down Expand Up @@ -276,6 +277,26 @@ class ScalarColumnWriter final : public ColumnWriter {
FlushPageCallback* _new_page_callback = nullptr;
};

// offsetColumnWriter is used column which has offset column, like array, map.
// column type is only uint64 and should response for whole column value [start, end], end will set
// in footer.next_array_item_ordinal which in finish_cur_page() callback put_extra_info_in_page()
class OffsetColumnWriter final : public ScalarColumnWriter, FlushPageCallback {
public:
OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
io::FileWriter* file_writer);

~OffsetColumnWriter() override;

Status init() override;

Status append_data(const uint8_t** ptr, size_t num_rows) override;

private:
Status put_extra_info_in_page(DataPageFooterPB* footer) override;

uint64_t _next_offset;
};

class StructColumnWriter final : public ColumnWriter {
public:
explicit StructColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
Expand Down Expand Up @@ -328,10 +349,10 @@ class StructColumnWriter final : public ColumnWriter {
ColumnWriterOptions _opts;
};

class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback {
class ArrayColumnWriter final : public ColumnWriter {
public:
explicit ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* offset_writer, ScalarColumnWriter* null_writer,
OffsetColumnWriter* offset_writer, ScalarColumnWriter* null_writer,
std::unique_ptr<ColumnWriter> item_writer);
~ArrayColumnWriter() override = default;

Expand Down Expand Up @@ -373,22 +394,21 @@ class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback {
ordinal_t get_next_rowid() const override { return _offset_writer->get_next_rowid(); }

private:
Status put_extra_info_in_page(DataPageFooterPB* header) override;
Status write_null_column(size_t num_rows, bool is_null); // 写入num_rows个null标记
bool has_empty_items() const { return _item_writer->get_next_rowid() == 0; }

private:
std::unique_ptr<ScalarColumnWriter> _offset_writer;
std::unique_ptr<OffsetColumnWriter> _offset_writer;
std::unique_ptr<ScalarColumnWriter> _null_writer;
std::unique_ptr<ColumnWriter> _item_writer;
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
ColumnWriterOptions _opts;
};

class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
class MapColumnWriter final : public ColumnWriter {
public:
explicit MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer, ScalarColumnWriter* offsets_writer,
ScalarColumnWriter* null_writer, OffsetColumnWriter* offsets_writer,
std::vector<std::unique_ptr<ColumnWriter>>& _kv_writers);

~MapColumnWriter() override = default;
Expand Down Expand Up @@ -432,12 +452,10 @@ class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
ordinal_t get_next_rowid() const override { return _offsets_writer->get_next_rowid(); }

private:
Status put_extra_info_in_page(DataPageFooterPB* header) override;

std::vector<std::unique_ptr<ColumnWriter>> _kv_writers;
// we need null writer to make sure a row is null or not
std::unique_ptr<ScalarColumnWriter> _null_writer;
std::unique_ptr<ScalarColumnWriter> _offsets_writer;
std::unique_ptr<OffsetColumnWriter> _offsets_writer;
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
ColumnWriterOptions _opts;
};
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/olap/olap_data_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap(
auto elem_size = end_offset - start_offset;

_offsets.clear();
// we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column
_offsets.reserve(_num_rows + 1);
for (int i = 0; i <= _num_rows; ++i) {
_offsets.push_back(column_array->offset_at(i + _row_pos) - start_offset + _base_offset);
Expand Down Expand Up @@ -1011,8 +1012,9 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
auto elem_size = end_offset - start_offset;

_offsets.clear();
_offsets.reserve(_num_rows);
for (int i = 0; i < _num_rows; ++i) {
// we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column
_offsets.reserve(_num_rows + 1);
for (int i = 0; i <= _num_rows; ++i) {
_offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset + _base_offset);
}
_base_offset += elem_size;
Expand Down