Skip to content

Commit

Permalink
Storages: introduce inverted index file format & builder & viewer
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Mar 3, 2025
1 parent f8906bb commit 07ebbf2
Show file tree
Hide file tree
Showing 28 changed files with 1,591 additions and 206 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class BitmapFilter
void set(BlockInputStreamPtr & stream);
// f[start, satrt+limit) = value
void set(UInt32 start, UInt32 limit, bool value = true);
void set(std::span<const UInt32> row_ids, const FilterPtr & f);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// Caller should ensure n in [0, size).
Expand All @@ -48,8 +49,6 @@ class BitmapFilter
friend class BitmapFilterView;

private:
void set(std::span<const UInt32> row_ids, const FilterPtr & f);

IColumn::Filter filter;
bool all_match;
};
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_headers_and_sources(delta_merge .)
add_headers_and_sources(delta_merge ./BitmapFilter)
add_headers_and_sources(delta_merge ./Index)
add_headers_and_sources(delta_merge ./Index/VectorIndex)
add_headers_and_sources(delta_merge ./Index/InvertedIndex)
add_headers_and_sources(delta_merge ./Filter)
add_headers_and_sources(delta_merge ./FilterParser)
add_headers_and_sources(delta_merge ./File)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyLocalIndexWriter.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Writer.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/DeltaMerge/Index/LocalIndexWriter.h>


namespace DB::ErrorCodes
Expand Down Expand Up @@ -94,7 +93,7 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
struct IndexToBuild
{
LocalIndexInfo info;
VectorIndexWriterInMemoryPtr builder_vector;
LocalIndexWriterPtr index_writer;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;
Expand All @@ -107,7 +106,7 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
RUNTIME_CHECK(index_info.def_vector_index != nullptr);
index_builders[index_info.column_id].emplace_back(IndexToBuild{
.info = index_info,
.builder_vector = {},
.index_writer = {},
});
}

Expand All @@ -125,17 +124,8 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
file->getDataPageId());

for (auto & index : indexes)
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexWriterInMemory::create(index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
}
index.index_writer = LocalIndexWriter::create(index.info, /*in_memory*/ false);

read_columns->push_back(*cd_iter);
}

Expand Down Expand Up @@ -171,16 +161,8 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
const auto & col = col_with_type_and_name.column;
for (const auto & index : index_builders[read_columns->at(col_idx).id])
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
RUNTIME_CHECK(index.builder_vector);
index.builder_vector->addBlock(*col, del_mark, should_proceed);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
RUNTIME_CHECK(index.index_writer);
index.index_writer->addBlock(*col, del_mark, should_proceed);
}
}
}
Expand All @@ -192,40 +174,16 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
const auto & cd = read_columns->at(col_idx);
for (const auto & index : index_builders[cd.id])
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
{
RUNTIME_CHECK(index.builder_vector);
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index.builder_vector->finalize(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});

auto idx_info = dtpb::ColumnFileIndexInfo{};
idx_info.set_index_page_id(index_page_id);
auto * idx_props = idx_info.mutable_index_props();
idx_props->set_kind(dtpb::IndexFileKind::VECTOR_INDEX);
idx_props->set_index_id(index.info.index_id);
idx_props->set_file_size(data_size);
auto * vector_index = idx_props->mutable_vector_index();
vector_index->set_format_version(0);
vector_index->set_dimensions(index.info.def_vector_index->dimension);
vector_index->set_distance_metric(
tipb::VectorDistanceMetric_Name(index.info.def_vector_index->distance_metric));
index_infos->emplace_back(std::move(idx_info));

break;
}
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
RUNTIME_CHECK(index.index_writer);
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
auto idx_info = index.index_writer->finalize(compressed, index_page_id);
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
// ColumnFileDataProviderRNLocalPageCache currently does not support read data with fields
options.wbs.log.putPage(index_page_id, 0, buf, data_size, {data_size});
index_infos->emplace_back(std::move(idx_info));
}
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ class DMFile : private boost::noncopyable
{
case TiDB::ColumnarIndexKind::Vector:
return fmt::format("idx_{}.vector", index_id);
case TiDB::ColumnarIndexKind::Inverted:
return fmt::format("idx_{}.inverted", index_id);
default:
throw Exception(fmt::format("Unsupported index kind: {}", magic_enum::enum_name(kind)));
}
Expand Down
61 changes: 8 additions & 53 deletions dbms/src/Storages/DeltaMerge/File/DMFileLocalIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
#include <Storages/DeltaMerge/File/DMFileLocalIndexWriter.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Writer.h>
#include <Storages/DeltaMerge/Index/LocalIndexWriter.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/PathPool.h>
#include <tipb/executor.pb.h>

#include <unordered_map>

Expand Down Expand Up @@ -116,7 +114,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
LocalIndexInfo info;
String index_file_path; // For write out
String index_file_name; // For meta include
VectorIndexWriterOnDiskPtr builder_vector;
LocalIndexWriterPtr index_writer;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;
Expand All @@ -127,7 +125,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
.info = index_info,
.index_file_path = "",
.index_file_name = "",
.builder_vector = {},
.index_writer = {},
});
}

Expand Down Expand Up @@ -156,17 +154,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
index.info.column_id,
index.info.index_id);

switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexWriterOnDisk::create( //
index.index_file_path,
index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
index.index_writer = LocalIndexWriter::create(index.info, /*in_memory*/ false);
}
read_columns.push_back(*cd_iter);
}
Expand Down Expand Up @@ -213,16 +201,8 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
const auto & col = col_with_type_and_name.column;
for (const auto & index : index_builders[read_columns[col_idx].id])
{
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
RUNTIME_CHECK(index.builder_vector);
index.builder_vector->addBlock(*col, del_mark, should_proceed);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}
RUNTIME_CHECK(index.index_writer);
index.index_writer->addBlock(*col, del_mark, should_proceed);
}
}
}
Expand All @@ -240,33 +220,8 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab

for (const auto & index : index_builders[cd.id])
{
dtpb::DMFileIndexInfo pb_dmfile_idx{};
auto * pb_idx = pb_dmfile_idx.mutable_index_props();

switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
{
index.builder_vector->finalize();
auto * pb_vec_idx = pb_idx->mutable_vector_index();
pb_vec_idx->set_format_version(0);
pb_vec_idx->set_dimensions(index.info.def_vector_index->dimension);
pb_vec_idx->set_distance_metric(
tipb::VectorDistanceMetric_Name(index.info.def_vector_index->distance_metric));
break;
}
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
break;
}

auto index_file = Poco::File(index.index_file_path);
RUNTIME_CHECK(index_file.exists());
pb_idx->set_kind(index.info.getKindAsDtpb());
pb_idx->set_index_id(index.info.index_id);
pb_idx->set_file_size(index_file.getSize());

total_built_index_bytes += pb_idx->file_size();
auto pb_dmfile_idx = index.index_writer->finalize(index.index_file_path);
total_built_index_bytes += pb_dmfile_idx.index_props().file_size();
new_indexes.emplace_back(std::move(pb_dmfile_idx));
iw->include(index.index_file_name);
}
Expand Down
Loading

0 comments on commit 07ebbf2

Please sign in to comment.