Skip to content

Commit

Permalink
Storage: Refine VectorIndexHNSW -> VectorIndex (#9932)
Browse files Browse the repository at this point in the history
ref #9032

Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish authored Mar 3, 2025
1 parent d6de212 commit f8906bb
Show file tree
Hide file tree
Showing 37 changed files with 492 additions and 2,757 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ ccache.tar
# https://clang.llvm.org/docs/JSONCompilationDatabase.html
compile_commands.json

# git patch reject report
*.rej

# vim cache files
*.swp

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ add_subdirectory(./Remote/Proto)
add_headers_and_sources(delta_merge .)
add_headers_and_sources(delta_merge ./BitmapFilter)
add_headers_and_sources(delta_merge ./Index)
add_headers_and_sources(delta_merge ./Index/VectorIndexHNSW)
add_headers_and_sources(delta_merge ./Index/VectorIndex)
add_headers_and_sources(delta_merge ./Filter)
add_headers_and_sources(delta_merge ./FilterParser)
add_headers_and_sources(delta_merge ./File)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,14 @@ Block ColumnFileSetWithVectorIndexInputStream::read()
return {};
}

std::vector<VectorIndexViewer::SearchResult> ColumnFileSetWithVectorIndexInputStream::load()
std::vector<VectorIndexReader::SearchResult> ColumnFileSetWithVectorIndexInputStream::load()
{
if (loaded)
return {};

tiny_readers.reserve(column_files.size());
UInt32 precedes_rows = 0;
std::vector<VectorIndexViewer::SearchResult> search_results;
std::vector<VectorIndexReader::SearchResult> search_results;
for (const auto & column_file : column_files)
{
if (auto * tiny_file = column_file->tryToTinyFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre
const ColumnDefinesPtr rest_col_defs;

// Set after load(). Top K search results in files with vector index.
std::vector<VectorIndexViewer::Key> sorted_results;
std::vector<VectorIndexReader::Key> sorted_results;
std::vector<ColumnFileTinyVectorIndexReaderPtr> tiny_readers;

const ColumnFiles & column_files;
Expand Down Expand Up @@ -94,7 +94,7 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre

Block read() override;

std::vector<VectorIndexViewer::SearchResult> load() override;
std::vector<VectorIndexReader::SearchResult> load() override;

void setSelectedRows(const std::span<const UInt32> & selected_rows) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyLocalIndexWriter.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyReader.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Writer.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>


Expand Down Expand Up @@ -94,7 +94,7 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
struct IndexToBuild
{
LocalIndexInfo info;
VectorIndexBuilderPtr builder_vector;
VectorIndexWriterInMemoryPtr builder_vector;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;
Expand Down Expand Up @@ -129,7 +129,7 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexBuilder::create(index.info.def_vector_index);
index.builder_vector = VectorIndexWriterInMemory::create(index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
Expand Down Expand Up @@ -200,7 +200,7 @@ ColumnFileTinyPtr ColumnFileTinyLocalIndexWriter::buildIndexForFile(
auto index_page_id = options.storage_pool->newLogPageId();
MemoryWriteBuffer write_buf;
CompressedWriteBuffer compressed(write_buf);
index.builder_vector->saveToBuffer(compressed);
index.builder_vector->finalize(compressed);
compressed.next();
auto data_size = write_buf.count();
auto buf = write_buf.tryGetReadBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache.h>
#include <Storages/DeltaMerge/Index/VectorSearchPerf.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Perf.h>


namespace DB::DM
{

void ColumnFileTinyVectorIndexReader::read(
MutableColumnPtr & vec_column,
const std::span<const VectorIndexViewer::Key> & read_rowids,
const std::span<const VectorIndexReader::Key> & read_rowids,
size_t rowid_start_offset)
{
RUNTIME_CHECK(loaded);
Expand All @@ -45,7 +46,7 @@ void ColumnFileTinyVectorIndexReader::read(
perf_stat.read_vec_column_seconds = watch.elapsedSeconds();
}

std::vector<VectorIndexViewer::SearchResult> ColumnFileTinyVectorIndexReader::load()
std::vector<VectorIndexReader::SearchResult> ColumnFileTinyVectorIndexReader::load()
{
if (loaded)
return {};
Expand Down Expand Up @@ -82,13 +83,13 @@ void ColumnFileTinyVectorIndexReader::loadVectorIndex()
auto index_page = data_provider->readTinyData(index_page_id, index_fields);
ReadBufferFromOwnString read_buf(index_page.data);
CompressedReadBuffer compressed(read_buf);
return VectorIndexViewer::load(index_info_iter->index_props().vector_index(), compressed);
return VectorIndexReader::createFromMemory(index_info_iter->index_props().vector_index(), compressed);
};
if (local_index_cache)
{
const auto key = fmt::format("{}{}", LocalIndexCache::COLUMNFILETINY_INDEX_NAME_PREFIX, index_page_id);
auto local_index = local_index_cache->getOrSet(key, load_from_page_storage);
vec_index = std::dynamic_pointer_cast<VectorIndexViewer>(local_index);
vec_index = std::dynamic_pointer_cast<VectorIndexReader>(local_index);
}
else
vec_index = load_from_page_storage();
Expand All @@ -115,7 +116,7 @@ ColumnFileTinyVectorIndexReader::~ColumnFileTinyVectorIndexReader()
perf_stat.returned_rows);
}

std::vector<VectorIndexViewer::SearchResult> ColumnFileTinyVectorIndexReader::loadVectorSearchResult()
std::vector<VectorIndexReader::SearchResult> ColumnFileTinyVectorIndexReader::loadVectorSearchResult()
{
auto perf_begin = PerfContext::vector_search;
RUNTIME_CHECK(valid_rows.size() == tiny_file.getRows(), valid_rows.size(), tiny_file.getRows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#include <Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFile.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Reader.h>


namespace DB::DM
Expand All @@ -31,7 +31,7 @@ class ColumnFileTinyVectorIndexReader

const ANNQueryInfoPtr ann_query_info;
// Set after load().
VectorIndexViewerPtr vec_index;
VectorIndexReaderPtr vec_index;
const BitmapFilterView valid_rows;
// Note: ColumnDefine comes from read path does not have vector_index fields.
const ColumnDefine vec_cd;
Expand Down Expand Up @@ -77,16 +77,16 @@ class ColumnFileTinyVectorIndexReader
// Read vector column data with the specified rowids.
void read(
MutableColumnPtr & vec_column,
const std::span<const VectorIndexViewer::Key> & read_rowids,
const std::span<const VectorIndexReader::Key> & read_rowids,
size_t rowid_start_offset);

// Load vector index and search results.
// Return the rowids of the selected rows.
std::vector<VectorIndexViewer::SearchResult> load();
std::vector<VectorIndexReader::SearchResult> load();

private:
void loadVectorIndex();
std::vector<VectorIndexViewer::SearchResult> loadVectorSearchResult();
std::vector<VectorIndexReader::SearchResult> loadVectorSearchResult();
};

using ColumnFileTinyVectorIndexReaderPtr = std::shared_ptr<ColumnFileTinyVectorIndexReader>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void ConcatVectorIndexBlockInputStream::load()
UInt32 precedes_rows = 0;
// otherwise the `row.key` of the search result is not correct
assert(stream->children.size() == index_streams.size());
std::vector<VectorIndexViewer::SearchResult> search_results;
std::vector<VectorIndexReader::SearchResult> search_results;
for (size_t i = 0; i < stream->children.size(); ++i)
{
if (auto * index_stream = index_streams[i]; index_stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Reader.h>
#include <Storages/DeltaMerge/ScanContext.h>

namespace DB::DM
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/DeltaMerge/File/ColumnCacheLongTerm_fwd.h>
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache_fwd.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileLocalIndexWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <Storages/DeltaMerge/File/DMFileLocalIndexWriter.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Writer.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/PathPool.h>
Expand Down Expand Up @@ -116,7 +116,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
LocalIndexInfo info;
String index_file_path; // For write out
String index_file_name; // For meta include
VectorIndexBuilderPtr builder_vector;
VectorIndexWriterOnDiskPtr builder_vector;
};

std::unordered_map<ColId, std::vector<IndexToBuild>> index_builders;
Expand Down Expand Up @@ -159,7 +159,9 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
switch (index.info.kind)
{
case TiDB::ColumnarIndexKind::Vector:
index.builder_vector = VectorIndexBuilder::create(index.info.def_vector_index);
index.builder_vector = VectorIndexWriterOnDisk::create( //
index.index_file_path,
index.info.def_vector_index);
break;
default:
RUNTIME_CHECK_MSG(false, "Unsupported index kind: {}", magic_enum::enum_name(index.info.kind));
Expand Down Expand Up @@ -245,7 +247,7 @@ size_t DMFileLocalIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutab
{
case TiDB::ColumnarIndexKind::Vector:
{
index.builder_vector->saveToFile(index.index_file_path);
index.builder_vector->finalize();
auto * pb_vec_idx = pb_idx->mutable_vector_index();
pb_vec_idx->set_format_version(0);
pb_vec_idx->set_dimensions(index.info.def_vector_index->dimension);
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileVectorIndexReader.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache.h>
#include <Storages/DeltaMerge/Index/VectorSearchPerf.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Perf.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/S3/FileCache.h>
#include <Storages/S3/FileCachePerf.h>
Expand All @@ -42,7 +42,7 @@ String DMFileVectorIndexReader::PerfStat::toString() const
duration_read_vec_column);
}

std::vector<VectorIndexViewer::SearchResult> DMFileVectorIndexReader::load()
std::vector<VectorIndexReader::SearchResult> DMFileVectorIndexReader::load()
{
if (loaded)
return {};
Expand Down Expand Up @@ -126,7 +126,7 @@ void DMFileVectorIndexReader::loadVectorIndex()

auto load_from_file = [&]() {
perf_stat.has_load_from_file = true;
return VectorIndexViewer::view(vector_index->index_props().vector_index(), local_index_file_path);
return VectorIndexReader::createFromMmap(vector_index->index_props().vector_index(), local_index_file_path);
};

Stopwatch watch;
Expand All @@ -136,7 +136,7 @@ void DMFileVectorIndexReader::loadVectorIndex()
// will check whether file is still valid and try to remove memory references
// when file is dropped.
auto local_index = local_index_cache->getOrSet(local_index_file_path, load_from_file);
vec_index = std::dynamic_pointer_cast<VectorIndexViewer>(local_index);
vec_index = std::dynamic_pointer_cast<VectorIndexReader>(local_index);
}
else
{
Expand Down Expand Up @@ -174,7 +174,7 @@ String DMFileVectorIndexReader::perfStat() const
perf_stat.selected_nodes);
}

std::vector<VectorIndexViewer::SearchResult> DMFileVectorIndexReader::loadVectorSearchResult()
std::vector<VectorIndexReader::SearchResult> DMFileVectorIndexReader::loadVectorSearchResult()
{
Stopwatch watch;

Expand Down Expand Up @@ -207,7 +207,7 @@ std::vector<VectorIndexViewer::SearchResult> DMFileVectorIndexReader::loadVector

void DMFileVectorIndexReader::read(
MutableColumnPtr & vec_column,
const std::span<const VectorIndexViewer::Key> & selected_rows)
const std::span<const VectorIndexReader::Key> & selected_rows)
{
Stopwatch watch;
RUNTIME_CHECK(loaded);
Expand Down
13 changes: 6 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Reader.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>

namespace DB::DM
Expand Down Expand Up @@ -52,7 +51,7 @@ class DMFileVectorIndexReader
PerfStat perf_stat;

// Set after load().
VectorIndexViewerPtr vec_index = nullptr;
VectorIndexReaderPtr vec_index = nullptr;
bool loaded = false;

public:
Expand All @@ -73,17 +72,17 @@ class DMFileVectorIndexReader
~DMFileVectorIndexReader();

// Read vector column data with the specified rowids.
void read(MutableColumnPtr & vec_column, const std::span<const VectorIndexViewer::Key> & selected_rows);
void read(MutableColumnPtr & vec_column, const std::span<const VectorIndexReader::Key> & selected_rows);

// Load vector index and search results.
// Return the rowids of the selected rows.
std::vector<VectorIndexViewer::SearchResult> load();
std::vector<VectorIndexReader::SearchResult> load();

String perfStat() const;

private:
void loadVectorIndex();
std::vector<VectorIndexViewer::SearchResult> loadVectorSearchResult();
std::vector<VectorIndexReader::SearchResult> loadVectorSearchResult();
};

using DMFileVectorIndexReaderPtr = std::shared_ptr<DMFileVectorIndexReader>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Block DMFileWithVectorIndexBlockInputStream::read()
return block;
}

std::vector<VectorIndexViewer::SearchResult> DMFileWithVectorIndexBlockInputStream::load()
std::vector<VectorIndexReader::SearchResult> DMFileWithVectorIndexBlockInputStream::load()
{
if (loaded)
return {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/DeltaMerge/File/DMFileVectorIndexReader.h>
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache_fwd.h>
#include <Storages/DeltaMerge/VectorIndexBlockInputStream.h>


Expand Down Expand Up @@ -96,7 +96,7 @@ class DMFileWithVectorIndexBlockInputStream : public VectorIndexBlockInputStream

Block getHeader() const override { return header; }

std::vector<VectorIndexViewer::SearchResult> load() override;
std::vector<VectorIndexReader::SearchResult> load() override;

void setSelectedRows(const std::span<const UInt32> & selected_rows) override;

Expand All @@ -123,7 +123,7 @@ class DMFileWithVectorIndexBlockInputStream : public VectorIndexBlockInputStream
const DMFileVectorIndexReaderPtr vec_index_reader;

// Set after load().
VectorIndexViewerPtr vec_index = nullptr;
VectorIndexReaderPtr vec_index = nullptr;
// VectorColumnFromIndexReaderPtr vec_column_reader = nullptr;
// Set after load(). Used to filter the output rows.
std::vector<UInt32> sorted_results{}; // Key is rowid
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Reader_fwd.h>

namespace DB
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <Common/FieldVisitors.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/Filter/RSOperator_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/Index/RSIndex.h>
#include <Storages/DeltaMerge/Index/RSResult.h>

Expand Down
Loading

0 comments on commit f8906bb

Please sign in to comment.