Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ futures-util = { version = "0.3.28", optional = true }
fnv = "1.0.7"
half = "2"
bytemuck = { version = "1", features = ["derive"] }
usearch = "2.23.0"

[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"
Expand All @@ -82,6 +83,7 @@ paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.4.3"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
async-trait = "0.1"
postcard = { version = "1.0.4", features = [
"use-std",
], default-features = false }
Expand Down
12 changes: 11 additions & 1 deletion src/index/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::SegmentComponent;
use crate::index::SegmentId;
use crate::schema::Schema;
use crate::store::Compressor;
use crate::vector::VectorAnnBuildParams;
use crate::{Inventory, Opstamp, TrackedObject};

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -143,6 +144,7 @@ impl SegmentMeta {
SegmentComponent::FieldNorms => ".fieldnorm".to_string(),
SegmentComponent::Delete => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
SegmentComponent::Vectors => ".vec".to_string(),
SegmentComponent::VectorAnn => ".ann".to_string(),
});
PathBuf::from(path)
}
Expand Down Expand Up @@ -266,6 +268,12 @@ pub struct IndexSettings {
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
/// Optional ANN build parameters for vector index serialization.
///
/// If omitted, usearch defaults are used.
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_ann_build_params: Option<VectorAnnBuildParams>,
}

/// Must be a function to be compatible with serde defaults
Expand All @@ -280,6 +288,7 @@ impl Default for IndexSettings {
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
docstore_compress_dedicated_thread: true,
vector_ann_build_params: None,
}
}
}
Expand Down Expand Up @@ -535,7 +544,8 @@ mod tests {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_compress_dedicated_thread: true,
docstore_blocksize: 16_384
docstore_blocksize: 16_384,
vector_ann_build_params: None,
}
);
{
Expand Down
5 changes: 4 additions & 1 deletion src/index/segment_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ pub enum SegmentComponent {
Delete,
/// Vector embeddings stored as JSON lines (one vector per doc_id line).
Vectors,
/// Vector ANN index stored separately from vector data.
VectorAnn,
}

impl SegmentComponent {
/// Iterates through the components.
pub fn iterator() -> slice::Iter<'static, SegmentComponent> {
static SEGMENT_COMPONENTS: [SegmentComponent; 9] = [
static SEGMENT_COMPONENTS: [SegmentComponent; 10] = [
SegmentComponent::Postings,
SegmentComponent::Positions,
SegmentComponent::FastFields,
Expand All @@ -44,6 +46,7 @@ impl SegmentComponent {
SegmentComponent::TempStore,
SegmentComponent::Delete,
SegmentComponent::Vectors,
SegmentComponent::VectorAnn,
];
SEGMENT_COMPONENTS.iter()
}
Expand Down
31 changes: 22 additions & 9 deletions src/index/segment_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{fmt, io};
use fnv::FnvHashMap;
use itertools::Itertools;

use crate::directory::error::OpenReadError;
use crate::directory::{CompositeFile, FileSlice};
use crate::error::DataCorruption;
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
Expand All @@ -16,7 +17,7 @@ use crate::schema::{Field, IndexRecordOption, Schema, Type};
use crate::space_usage::SegmentSpaceUsage;
use crate::store::StoreReader;
use crate::termdict::TermDictionary;
use crate::vector::VectorReader;
use crate::vector::{VectorAnnReader, VectorReader};
use crate::{DocId, Executor, Opstamp};

/// Entry point to access all of the datastructures of the `Segment`
Expand Down Expand Up @@ -48,6 +49,8 @@ pub struct SegmentReader {
store_file: FileSlice,
/// Optional vector file data (present if segment has vector fields)
vector_file_opt: Option<FileSlice>,
/// Optional vector ANN file data (present if segment has ANN indexes)
vector_ann_file_opt: Option<FileSlice>,
alive_bitset_opt: Option<AliveBitSet>,
schema: Schema,
}
Expand Down Expand Up @@ -203,7 +206,12 @@ impl SegmentReader {
};

// Try to load vector file if it exists
let vector_file_opt = segment.open_read(SegmentComponent::Vectors).ok();
let vector_file_opt = match segment.open_read(SegmentComponent::Vectors) {
Ok(vector_file) => Some(vector_file),
Err(OpenReadError::FileDoesNotExist(_)) => None,
Err(e) => return Err(e.into()),
};
let vector_ann_file_opt = segment.open_read(SegmentComponent::VectorAnn).ok();

let alive_bitset_opt = intersect_alive_bitset(original_bitset, custom_bitset);

Expand All @@ -225,6 +233,7 @@ impl SegmentReader {
delete_opstamp: segment.meta().delete_opstamp(),
store_file,
vector_file_opt,
vector_ann_file_opt,
alive_bitset_opt,
positions_composite,
schema,
Expand Down Expand Up @@ -436,13 +445,17 @@ impl SegmentReader {
/// Returns `None` if the segment has no vector data.
/// The `field` parameter is used for API consistency but all fields' vectors
/// are stored in the same file.
pub fn vector_reader(&self, _field: Field) -> Option<VectorReader> {
self.vector_file_opt.as_ref().and_then(|file_slice| {
file_slice
.read_bytes()
.ok()
.and_then(|bytes| VectorReader::open(bytes.as_slice()).ok())
})
pub fn vector_reader(&self, _field: Field) -> io::Result<Option<VectorReader>> {
match self.vector_file_opt.as_ref() {
None => Ok(None),
Some(file_slice) => Ok(Some(VectorReader::open(file_slice.read_bytes()?)?)),
}
}

pub fn vector_ann_reader(&self) -> Option<VectorAnnReader> {
self.vector_ann_file_opt
.as_ref()
.and_then(|file_slice| VectorAnnReader::open(file_slice.clone()).ok())
}

/// Returns the bitset representing the alive `DocId`s.
Expand Down
41 changes: 33 additions & 8 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use measure_time::debug_time;

use common::TerminatingWrite;

use crate::directory::{Directory, WritePtr};
use crate::directory::WritePtr;
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{AliveBitSet, FastFieldNotAvailableError};
Expand All @@ -22,7 +22,7 @@ use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
use crate::vector::VectorReader;
use crate::vector::{VectorAnnWriter, VectorReader};
use crate::{
DocAddress, DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order,
SegmentComponent, SegmentOrdinal,
Expand Down Expand Up @@ -804,7 +804,18 @@ impl IndexMerger {
let vector_write = serializer
.segment_mut()
.open_write(SegmentComponent::Vectors)?;
self.write_vectors(vector_write, &doc_id_mapping)?;
let (vector_fields, ann_entries) = self.write_vectors(vector_write, &doc_id_mapping)?;

debug!("write-vector-ann");
let vector_ann_write = serializer
.segment_mut()
.open_write(SegmentComponent::VectorAnn)?;
VectorAnnWriter::serialize_from_merged(
vector_ann_write,
&vector_fields,
ann_entries,
self.index_settings.vector_ann_build_params.as_ref(),
)?;
}

debug!("close-serializer");
Expand All @@ -824,7 +835,10 @@ impl IndexMerger {
&self,
mut wrt: WritePtr,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
) -> crate::Result<(
Vec<Field>,
Vec<(Field, String, u32, Vec<(DocId, Vec<f32>)>)>,
)> {
use std::collections::BTreeSet;
use std::io::Write;

Expand All @@ -842,7 +856,7 @@ impl IndexMerger {
.collect();

if vector_fields.is_empty() {
return Ok(());
return Ok((vector_fields, Vec::new()));
}

// Load vector readers for each segment
Expand All @@ -862,7 +876,9 @@ impl IndexMerger {
}
}

use crate::vector::format::{PresenceBitsetBuilder, VectorEncoding, VECTOR_MAGIC, VECTOR_VERSION};
use crate::vector::format::{
PresenceBitsetBuilder, VectorEncoding, VECTOR_MAGIC, VECTOR_VERSION,
};

// Write V2 header
wrt.write_all(&VECTOR_MAGIC.to_le_bytes())?;
Expand All @@ -878,6 +894,8 @@ impl IndexMerger {

wrt.write_all(&self.max_doc.to_le_bytes())?;

let mut ann_entries: Vec<(Field, String, u32, Vec<(DocId, Vec<f32>)>)> = Vec::new();

// Write vectors for each field in columnar format
for field in &vector_fields {
// Collect all vector IDs from all segments for this field
Expand All @@ -903,6 +921,7 @@ impl IndexMerger {
// First pass: collect vectors and determine dimensions
let mut presence_builder = PresenceBitsetBuilder::new(self.max_doc);
let mut ordered_vectors: Vec<Vec<f32>> = Vec::new();
let mut ordered_doc_vectors: Vec<(DocId, Vec<f32>)> = Vec::new();
let mut dimensions = 0u32;

for (new_doc_id, doc_addr) in
Expand All @@ -920,7 +939,9 @@ impl IndexMerger {
dimensions = vec.len() as u32;
}
presence_builder.set(new_doc_id as u32);
ordered_vectors.push(vec.into_owned());
let owned = vec.into_owned();
ordered_vectors.push(owned.clone());
ordered_doc_vectors.push((new_doc_id as u32, owned));
}
}

Expand All @@ -937,12 +958,16 @@ impl IndexMerger {
wrt.write_all(&v.to_le_bytes())?;
}
}

if !ordered_doc_vectors.is_empty() && dimensions > 0 {
ann_entries.push((*field, vector_id.clone(), dimensions, ordered_doc_vectors));
}
}
}

// Terminate with footer (adds magic bytes required by tantivy's file reading)
wrt.terminate()?;
Ok(())
Ok((vector_fields, ann_entries))
}
}

Expand Down
19 changes: 18 additions & 1 deletion src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::schema::document::{Document, ReferenceValue, Value};
use crate::schema::{FieldEntry, FieldType, Schema, Term, DATE_TIME_PRECISION_INDEXED};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer};
use crate::vector::VectorFieldsWriter;
use crate::vector::{VectorAnnWriter, VectorFieldsWriter};
use crate::{DocId, Opstamp, SegmentComponent, TantivyError};

/// Computes the initial size of the hash table.
Expand Down Expand Up @@ -467,11 +467,28 @@ fn remap_and_write(

// Serialize vectors if there are any vector fields
if vector_fields_writer.has_vector_fields() {
let vector_ann_build_params = serializer
.segment()
.index()
.settings()
.vector_ann_build_params
.clone();
debug!("vector-serialize");
let vector_write = serializer
.segment_mut()
.open_write(SegmentComponent::Vectors)?;
vector_fields_writer.serialize(vector_write, doc_id_map)?;

debug!("vector-ann-serialize");
let vector_ann_write = serializer
.segment_mut()
.open_write(SegmentComponent::VectorAnn)?;
VectorAnnWriter::serialize_from_writer(
&vector_fields_writer,
vector_ann_write,
doc_id_map,
vector_ann_build_params.as_ref(),
)?;
}

// finalize temp docstore and create version, which reflects the doc_id_map
Expand Down
2 changes: 2 additions & 0 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod set_query;
mod term_query;
mod union;
mod weight;
mod vector_ann_query;

#[cfg(test)]
mod vec_docset;
Expand Down Expand Up @@ -63,6 +64,7 @@ pub use self::scorer::Scorer;
pub use self::set_query::TermSetQuery;
pub use self::term_query::TermQuery;
pub use self::union::Union;
pub use self::vector_ann_query::VectorAnnQuery;
#[cfg(test)]
pub use self::vec_docset::VecDocSet;
pub use self::weight::Weight;
Expand Down
Loading