Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external indexing quantization #151

Merged
merged 6 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-cli-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
type: string
description: "CLI version"
required: true
default: "0.3.25"
default: "0.3.26"
IMAGE_NAME:
type: string
description: "Container image name to tag"
Expand Down
5 changes: 4 additions & 1 deletion Dockerfile.cli
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ ENV RUSTFLAGS=$RUSTFLAGS
ARG CXXFLAGS=""
ENV CXXFLAGS=$CXXFLAGS

ENV CC=/usr/bin/clang-18
ENV CXX=/usr/bin/clang++-18
# Remove lantern_extras from workspace
RUN rm -rf lantern_extras && sed -i -e 's/"lantern_extras",//' Cargo.toml

# Build your program for release
RUN apt update && \
apt install -y --no-install-recommends wget build-essential pkg-config clang curl libssl-dev && \
apt install -y --no-install-recommends lsb-release wget software-properties-common gnupg pkg-config curl libssl-dev && \
curl -s https://apt.llvm.org/llvm.sh | bash -s -- 18 && \
cargo build --release --package lantern_cli

FROM debian:12
Expand Down
5 changes: 4 additions & 1 deletion Dockerfile.cli.cuda
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ COPY . .
# Remove lantern_extras from workspace
RUN rm -rf lantern_extras && sed -i -e 's/"lantern_extras",//' Cargo.toml

ENV CC=/usr/bin/clang-18
ENV CXX=/usr/bin/clang++-18
# Build your program for release
RUN apt update && \
apt install -y --no-install-recommends wget build-essential pkg-config clang curl libssl-dev && \
apt install -y --no-install-recommends lsb-release wget software-properties-common gnupg pkg-config curl libssl-dev && \
curl -s https://apt.llvm.org/llvm.sh | bash -s -- 18 && \
cargo build --release --package lantern_cli

FROM nvcr.io/nvidia/cuda:11.8.0-runtime-ubuntu22.04
Expand Down
7 changes: 3 additions & 4 deletions lantern_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lantern_cli"
version = "0.3.25"
version = "0.3.26"
edition = "2021"

[[bin]]
Expand Down Expand Up @@ -40,7 +40,7 @@ strum = { version = "0.25", features = ["derive"] }
tiktoken-rs = "0.5.8"
regex = "1.10.3"
postgres-types = { version = "0.2.6", features = ["derive"] }
usearch = { git = "https://github.com/Ngalstyan4/usearch.git", rev = "de8580c7e4f8d43d878905a7c81ec65fe0d5a2db" }
usearch = { git = "https://github.com/Ngalstyan4/usearch.git", rev = "aa4f91d21230fd611b6c7741fa06be8c20acc9a9" }
actix-web = { version = "4.5.1", optional = true }
env_logger = { version = "0.11.2", optional = true }
deadpool-postgres = { version = "0.12.1", optional = true }
Expand All @@ -50,7 +50,6 @@ utoipa = { version = "4.2.0", optional = true}
utoipa-swagger-ui = { version = "6.0.0", features = ["actix-web"], optional = true }
actix-web-httpauth = { version = "0.8.1", optional = true }
tokio-util = "0.7.11"
byteorder = { version="1.5.0", optional=true }
bitvec = { version="1.0.1", optional=true }
rustls = { version="0.23.12", optional=true }
rustls-pemfile = { version="2.1.2", optional=true }
Expand All @@ -64,7 +63,7 @@ autotune = []
pq = ["dep:gcp_auth", "dep:linfa", "dep:linfa-clustering", "dep:md5", "dep:rayon"]
cli = []
external-index = []
external-index-server = ["dep:byteorder", "dep:bitvec", "dep:rustls", "dep:rustls-pemfile", "dep:glob"]
external-index-server = ["dep:bitvec", "dep:rustls", "dep:rustls-pemfile", "dep:glob"]
embeddings = ["dep:bytes"]

[lib]
Expand Down
121 changes: 46 additions & 75 deletions lantern_cli/src/external_index/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use super::cli::{IndexServerArgs, UMetricKind};
use bitvec::prelude::*;
use byteorder::{ByteOrder, LittleEndian};
use glob::glob;
use itertools::Itertools;
use rand::Rng;
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
use rustls::{ServerConfig, ServerConnection, StreamOwned};
Expand All @@ -15,7 +12,7 @@ use std::path::Path;
use std::sync::mpsc::{self, Receiver, SyncSender};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use usearch::ffi::{IndexOptions, MetricKind, ScalarKind};
use usearch::ffi::{IndexOptions, ScalarKind};
use usearch::Index;

use crate::logger::{LogLevel, Logger};
Expand All @@ -32,15 +29,10 @@ pub const INIT_MSG: u32 = 0x13333337;
pub const END_MSG: u32 = 0x31333337;
pub const ERR_MSG: u32 = 0x37333337;
// magic byte + pq + metric_kind + quantization + dim + m + efc + ef + num_centroids +
// num_subvectors + capacity
static INDEX_HEADER_LENGTH: usize = INTEGER_SIZE * 11;
// num_subvectors + capacity + element_bits
static INDEX_HEADER_LENGTH: usize = INTEGER_SIZE * 12;

enum VectorType {
F32(Vec<f32>),
I8(Vec<i8>),
}

type Row = (u64, VectorType);
type Row = (u64, Vec<u8>);

struct ThreadSafeIndex(Index);

Expand Down Expand Up @@ -84,15 +76,15 @@ fn parse_index_options(
logger: Arc<Logger>,
stream: Arc<Mutex<dyn Connection>>,
buf: &[u8],
) -> Result<IndexOptions, anyhow::Error> {
let mut params: [u32; 9] = [0; 9];
) -> Result<(IndexOptions, u32, u32), anyhow::Error> {
let mut params: [u32; 11] = [0; 11];

for i in 0..params.len() {
let start_idx = INTEGER_SIZE * i;
params[i] = u32::from_le_bytes(buf[start_idx..(start_idx + INTEGER_SIZE)].try_into()?);
}

let [pq, metric_kind, quantization, dim, m, ef_construction, ef, num_centroids, num_subvectors] =
let [pq, metric_kind, quantization, dim, m, ef_construction, ef, num_centroids, num_subvectors, estimated_capacity, element_bits] =
params;

let pq = pq == 1;
Expand All @@ -108,7 +100,7 @@ fn parse_index_options(

let metric = UMetricKind::from_u32(metric_kind)?.value();

logger.info(&format!("Index Params - pq: {pq}, metric_kind: {:?}, quantization: {:?}, dim: {dim}, m: {m}, ef_construction: {ef_construction}, ef: {ef}, num_subvectors: {num_subvectors}, num_centroids: {num_centroids}", metric, quantization));
logger.info(&format!("Index Params - pq: {pq}, metric_kind: {:?}, quantization: {:?}, dim: {dim}, m: {m}, ef_construction: {ef_construction}, ef: {ef}, num_subvectors: {num_subvectors}, num_centroids: {num_centroids}, element_bits: {element_bits}", metric, quantization));

let mut pq_codebook: *const f32 = std::ptr::null();

Expand All @@ -135,55 +127,46 @@ fn parse_index_options(
pq_codebook = codebook.as_ptr();
}

Ok(IndexOptions {
dimensions: dim as usize,
metric,
quantization,
multi: false,
connectivity: m as usize,
expansion_add: ef_construction as usize,
expansion_search: ef as usize,
num_threads: 0, // automatic
// note: pq_construction and pq_output distinction is not yet implemented in usearch
// in the future, if pq_construction is false, we will use full vectors in memory (and
// require large memory for construction) but will output pq-quantized graph
//
// currently, regardless of pq_construction value, as long as pq_output is true,
// we construct a pq_quantized index using quantized values during construction
pq_construction: pq,
pq_output: pq,
num_centroids: num_centroids as usize,
num_subvectors: num_subvectors as usize,
codebook: pq_codebook,
})
Ok((
IndexOptions {
dimensions: dim as usize,
metric,
quantization,
multi: false,
connectivity: m as usize,
expansion_add: ef_construction as usize,
expansion_search: ef as usize,
num_threads: 0, // automatic
// note: pq_construction and pq_output distinction is not yet implemented in usearch
// in the future, if pq_construction is false, we will use full vectors in memory (and
// require large memory for construction) but will output pq-quantized graph
//
// currently, regardless of pq_construction value, as long as pq_output is true,
// we construct a pq_quantized index using quantized values during construction
pq_construction: pq,
pq_output: pq,
num_centroids: num_centroids as usize,
num_subvectors: num_subvectors as usize,
codebook: pq_codebook,
},
element_bits,
estimated_capacity,
))
}

fn bytes_to_f32_vec_le(bytes: &[u8]) -> Vec<f32> {
let mut float_vec = Vec::with_capacity(bytes.len() / 4);

for chunk in bytes.chunks_exact(4) {
float_vec.push(LittleEndian::read_f32(chunk));
float_vec.push(f32::from_le_bytes(chunk.try_into().unwrap()));
}

float_vec
}

fn parse_tuple(buf: &[u8], element_bits: usize) -> Result<Row, anyhow::Error> {
fn parse_tuple(buf: &[u8]) -> Result<Row, anyhow::Error> {
let label = u64::from_le_bytes(buf[..LABEL_SIZE].try_into()?);
let vec: VectorType = match element_bits {
1 => VectorType::I8(
buf[LABEL_SIZE..]
.iter()
.map(|e| {
BitSlice::<_, Lsb0>::from_element(e)
.iter()
.map(|n| if *n.as_ref() { 0 } else { 1 })
.collect::<Vec<i8>>()
})
.concat(),
),
_ => VectorType::F32(bytes_to_f32_vec_le(&buf[LABEL_SIZE..])),
};
let vec: Vec<u8> = buf[LABEL_SIZE..].to_vec();

Ok((label, vec))
}
Expand All @@ -199,37 +182,28 @@ fn initialize_index(
match read_frame(&mut soc_stream, buf, INDEX_HEADER_LENGTH, Some(INIT_MSG))? {
ProtocolMessage::Init(buf) => {
drop(soc_stream);
let index_options = parse_index_options(
let (index_options, element_bits, estimated_capacity) = parse_index_options(
logger.clone(),
stream.clone(),
&buf[PROTOCOL_HEADER_SIZE..INDEX_HEADER_LENGTH - PROTOCOL_HEADER_SIZE],
&buf[PROTOCOL_HEADER_SIZE..INDEX_HEADER_LENGTH],
)?;
let index = Index::new(&index_options)?;
logger.info(&format!(
"Creating index with parameters dimensions={} m={} ef={} ef_construction={}",
"Creating index with parameters dimensions={} m={} ef={} ef_construction={}, hardware_acceleration={}",
index_options.dimensions,
index_options.connectivity,
index_options.expansion_search,
index_options.expansion_add
index_options.expansion_add,
index.hardware_acceleration()
));

let index = Index::new(&index_options)?;
let estimated_capacity: u32 = u32::from_le_bytes(
buf[INDEX_HEADER_LENGTH - INTEGER_SIZE..INDEX_HEADER_LENGTH]
.try_into()
.unwrap(),
);
logger.info(&format!("Estimated capcity is {estimated_capacity}"));
index.reserve(estimated_capacity as usize)?;
let mut soc_stream = stream.lock().unwrap();
// send success code
soc_stream.write_data(&[0]).unwrap();

let element_bits = match index_options.metric {
MetricKind::Hamming => 1,
_ => INTEGER_SIZE * CHAR_BITS,
};

Ok((element_bits, ThreadSafeIndex(index)))
Ok((element_bits as usize, ThreadSafeIndex(index)))
}
_ => anyhow::bail!("send init message first"),
}
Expand Down Expand Up @@ -263,7 +237,7 @@ fn receive_rows(
match read_frame(&mut stream, buf, expected_payload_size, None)? {
ProtocolMessage::Exit => break,
ProtocolMessage::Data(buf) => {
let row = parse_tuple(&buf, element_bits)?;
let row = parse_tuple(&buf)?;

if received_rows == current_capacity {
current_capacity *= 2;
Expand Down Expand Up @@ -307,7 +281,7 @@ fn read_frame<'a>(
anyhow::bail!("Invalid frame received");
}

match LittleEndian::read_u32(&buf[0..PROTOCOL_HEADER_SIZE]) {
match u32::from_le_bytes(buf[0..4].try_into().unwrap()) {
END_MSG => Ok(ProtocolMessage::Exit),
msg => {
if let Some(wanted_hdr) = match_header {
Expand Down Expand Up @@ -370,10 +344,7 @@ pub fn create_streaming_usearch_index(

if let Ok(row) = row_result {
let index = index_ref.read().unwrap();
match row.1 {
VectorType::F32(vec) => index.0.add(row.0, &vec)?,
VectorType::I8(vec) => index.0.add(row.0, &vec)?,
}
index.0.add_raw(row.0, &row.1, element_bits)?;
} else {
// channel has been closed
break;
Expand Down
35 changes: 20 additions & 15 deletions lantern_cli/tests/external_index_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ async fn test_external_index_server_indexing() {
(index_options.num_centroids as u32).to_le_bytes(),
(index_options.num_subvectors as u32).to_le_bytes(),
((tuples.len() / 2) as u32).to_le_bytes(), // test resizing
(32 as u32).to_le_bytes(),
]
.concat();

Expand Down Expand Up @@ -398,6 +399,7 @@ async fn test_external_index_server_indexing_ssl() {
(index_options.num_centroids as u32).to_le_bytes(),
(index_options.num_subvectors as u32).to_le_bytes(),
((tuples.len() / 2) as u32).to_le_bytes(), // test resizing
(32 as u32).to_le_bytes(),
]
.concat();

Expand Down Expand Up @@ -510,6 +512,7 @@ async fn test_external_index_server_indexing_scalar_quantization() {
(index_options.num_centroids as u32).to_le_bytes(),
(index_options.num_subvectors as u32).to_le_bytes(),
(tuples.len() as u32).to_le_bytes(),
(32 as u32).to_le_bytes(),
]
.concat();

Expand Down Expand Up @@ -587,21 +590,21 @@ async fn test_external_index_server_indexing_hamming_distance() {
codebook: pq_codebook,
};

let tuples = vec![
(0, vec![0.0, 0.0, 0.0]),
(1, vec![0.0, 0.0, 1.0]),
(2, vec![0.0, 0.0, 2.0]),
(3, vec![0.0, 0.0, 3.0]),
(4, vec![0.0, 1.0, 0.0]),
(5, vec![0.0, 1.0, 1.0]),
(6, vec![0.0, 1.0, 2.0]),
(7, vec![0.0, 1.0, 3.0]),
(8, vec![1.0, 0.0, 0.0]),
(9, vec![1.0, 0.0, 1.0]),
(10, vec![1.0, 0.0, 2.0]),
(11, vec![1.0, 0.0, 3.0]),
(12, vec![1.0, 1.0, 0.0]),
(13, vec![1.0, 1.0, 1.0]),
let tuples: Vec<(usize, Vec<i8>)> = vec![
(0, vec![0, 0, 0]),
(1, vec![0, 0, 1]),
(2, vec![0, 0, 2]),
(3, vec![0, 0, 3]),
(4, vec![0, 1, 0]),
(5, vec![0, 1, 1]),
(6, vec![0, 1, 2]),
(7, vec![0, 1, 3]),
(8, vec![1, 0, 0]),
(9, vec![1, 0, 1]),
(10, vec![1, 0, 2]),
(11, vec![1, 0, 3]),
(12, vec![1, 1, 0]),
(13, vec![1, 1, 1]),
];

let mut stream = TcpStream::connect("127.0.0.1:8998").unwrap();
Expand All @@ -622,6 +625,7 @@ async fn test_external_index_server_indexing_hamming_distance() {
(index_options.num_centroids as u32).to_le_bytes(),
(index_options.num_subvectors as u32).to_le_bytes(),
(tuples.len() as u32).to_le_bytes(),
(1 as u32).to_le_bytes(),
]
.concat();

Expand Down Expand Up @@ -741,6 +745,7 @@ async fn test_external_index_server_indexing_pq() {
(index_options.num_centroids as u32).to_le_bytes(),
(index_options.num_subvectors as u32).to_le_bytes(),
(tuples.len() as u32).to_le_bytes(),
(32 as u32).to_le_bytes(),
]
.concat();

Expand Down