Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/aggregation/agg_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ pub(crate) fn build_segment_agg_collector(
&req_data.req,
node.idx_in_req_data,
req_data.segment_ordinal,
req_data.segment_num_docs,
)))
}
AggKind::Histogram => Ok(Box::new(SegmentHistogramCollector::from_req_and_validate(
Expand Down Expand Up @@ -678,6 +679,7 @@ fn build_nodes(
segment_ordinal,
name: agg_name.to_string(),
req: top_hits.clone(),
segment_num_docs: reader.num_docs(),
});
let children = build_children(&req.sub_aggregation, reader, segment_ordinal, data)?;
Ok(vec![AggRefNode {
Expand Down
118 changes: 117 additions & 1 deletion src/aggregation/metric/top_hits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct TopHitsAggReqData {
pub name: String,
/// The top_hits aggregation request.
pub req: TopHitsAggregationReq,
/// The number of documents in the segment.
pub segment_num_docs: u32,
}

impl TopHitsAggReqData {
Expand Down Expand Up @@ -525,13 +527,22 @@ impl TopHitsSegmentCollector {
req: &TopHitsAggregationReq,
accessor_idx: usize,
segment_ordinal: SegmentOrdinal,
segment_num_docs: u32,
) -> Self {
let requested_size = req.size + req.from.unwrap_or(0);
let capped_size = requested_size.min(segment_num_docs as usize);
Self {
top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)),
top_n: TopNComputer::new(capped_size),
segment_ordinal,
accessor_idx,
}
}

#[cfg(test)]
pub(crate) fn topn_capacity(&self) -> usize {
self.top_n.buffer_capacity()
}

fn into_top_hits_collector(
self,
value_accessors: &HashMap<String, Vec<DynamicColumn>>,
Expand Down Expand Up @@ -924,4 +935,109 @@ mod tests {
fn test_aggregation_top_hits_multi_segment() -> crate::Result<()> {
test_aggregation_top_hits(false)
}

#[test]
fn test_top_hits_segment_collector_caps_buffer() {
use super::{TopHitsAggregationReq, TopHitsSegmentCollector};

// Test that buffer is capped when size > segment_num_docs
let req = TopHitsAggregationReq {
size: 50000,
from: Some(0),
sort: vec![],
doc_value_fields: vec![],
_source: None,
fields: None,
script_fields: None,
highlight: None,
explain: None,
version: None,
};

// Create collector for a small segment with only 10 docs
let collector = TopHitsSegmentCollector::from_req(&req, 0, 0, 10);

// Buffer should be capped at 2 * min(50000, 10) = 20
assert_eq!(collector.topn_capacity(), 20);
}

#[test]
fn test_top_hits_segment_collector_with_offset_caps_buffer() {
use super::{TopHitsAggregationReq, TopHitsSegmentCollector};

// Test that size + offset is capped to segment_num_docs
let req = TopHitsAggregationReq {
size: 10000,
from: Some(5000),
sort: vec![],
doc_value_fields: vec![],
_source: None,
fields: None,
script_fields: None,
highlight: None,
explain: None,
version: None,
};

// Create collector for a segment with 100 docs
let collector = TopHitsSegmentCollector::from_req(&req, 0, 0, 100);

// Buffer should be capped at 2 * min(15000, 100) = 200
assert_eq!(collector.topn_capacity(), 200);
}

#[test]
fn test_top_hits_segment_collector_no_cap_when_size_smaller() {
use super::{TopHitsAggregationReq, TopHitsSegmentCollector};

// Test that when size < segment_num_docs, no capping occurs
let req = TopHitsAggregationReq {
size: 10,
from: Some(0),
sort: vec![],
doc_value_fields: vec![],
_source: None,
fields: None,
script_fields: None,
highlight: None,
explain: None,
version: None,
};

// Create collector for a large segment with 10000 docs
let collector = TopHitsSegmentCollector::from_req(&req, 0, 0, 10000);

// Buffer should be 2 * min(10, 10000) = 20 (not affected by large segment)
assert_eq!(collector.topn_capacity(), 20);
}

#[test]
fn test_top_hits_segment_collector_simulates_user_case() {
use super::{TopHitsAggregationReq, TopHitsSegmentCollector};

// Simulate the user's actual scenario: 1300 segments with ~77 docs, size=50000
let req = TopHitsAggregationReq {
size: 50000,
from: Some(0),
sort: vec![],
doc_value_fields: vec![],
_source: None,
fields: None,
script_fields: None,
highlight: None,
explain: None,
version: None,
};

// Each segment has approximately 77 docs
let segment_num_docs = 77;
let collector = TopHitsSegmentCollector::from_req(&req, 0, 0, segment_num_docs);

// Buffer should be capped at 2 * min(50000, 77) = 154
assert_eq!(collector.topn_capacity(), 154);

// Without the fix: would allocate 2 * 50000 = 100,000 elements
// With the fix: allocates only 2 * 77 = 154 elements
// For 1300 segments: saves 130M - 200K = ~129.8M elements!
}
}
115 changes: 113 additions & 2 deletions src/collector/top_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ where T: PartialOrd + Clone
pub(crate) fn for_segment<F: PartialOrd + Clone>(
&self,
segment_id: SegmentOrdinal,
_: &SegmentReader,
reader: &SegmentReader,
) -> TopSegmentCollector<F> {
TopSegmentCollector::new(segment_id, self.limit + self.offset)
let requested_size = self.limit + self.offset;
let capped_size = requested_size.min(reader.num_docs() as usize);
TopSegmentCollector::new(segment_id, capped_size)
}

/// Create a new TopCollector with the same limit and offset.
Expand Down Expand Up @@ -166,6 +168,11 @@ impl<T: PartialOrd + Clone> TopSegmentCollector<T> {
}

impl<T: PartialOrd + Clone> TopSegmentCollector<T> {
#[cfg(test)]
pub(crate) fn topn_capacity(&self) -> usize {
self.topn_computer.buffer_capacity()
}

pub fn harvest(self) -> Vec<(T, DocAddress)> {
let segment_ord = self.segment_ord;
self.topn_computer
Expand Down Expand Up @@ -304,6 +311,110 @@ mod tests {

assert_eq!(results, vec![]);
}

#[test]
fn test_for_segment_caps_buffer_to_segment_size() -> crate::Result<()> {
use crate::schema::{Schema, TEXT};
use crate::Index;

// Create an index with a small segment
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;

// Add only 5 documents
for i in 0..5 {
index_writer.add_document(doc!(text_field => format!("doc {}", i)))?;
}
index_writer.commit()?;

let reader = index.reader()?;
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);

// Verify segment has 5 docs
assert_eq!(segment_reader.num_docs(), 5);

// Request a limit much larger than segment size
let collector = TopCollector::<f32>::with_limit(1000);
let segment_collector: TopSegmentCollector<f32> = collector.for_segment(0, segment_reader);

// The buffer capacity should be capped at 2 * segment_size = 10
// (TopNComputer allocates 2x the requested size)
assert_eq!(segment_collector.topn_capacity(), 10);

Ok(())
}

#[test]
fn test_for_segment_with_offset_caps_buffer() -> crate::Result<()> {
use crate::schema::{Schema, TEXT};
use crate::Index;

// Create an index with a small segment
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;

// Add 10 documents
for i in 0..10 {
index_writer.add_document(doc!(text_field => format!("doc {}", i)))?;
}
index_writer.commit()?;

let reader = index.reader()?;
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);

assert_eq!(segment_reader.num_docs(), 10);

// Request limit=1000 + offset=500 = 1500 total, but segment only has 10 docs
let collector = TopCollector::<f32>::with_limit(1000).and_offset(500);
let segment_collector: TopSegmentCollector<f32> = collector.for_segment(0, segment_reader);

// Buffer should be capped at 2 * min(1500, 10) = 20
assert_eq!(segment_collector.topn_capacity(), 20);

Ok(())
}

#[test]
fn test_for_segment_doesnt_cap_when_limit_smaller() -> crate::Result<()> {
use crate::schema::{Schema, TEXT};
use crate::Index;

// Create an index with a larger segment
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;

// Add 100 documents
for i in 0..100 {
index_writer.add_document(doc!(text_field => format!("doc {}", i)))?;
}
index_writer.commit()?;

let reader = index.reader()?;
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);

assert_eq!(segment_reader.num_docs(), 100);

// Request a small limit (10) from a large segment (100)
let collector = TopCollector::<f32>::with_limit(10);
let segment_collector: TopSegmentCollector<f32> = collector.for_segment(0, segment_reader);

// Buffer should be 2 * min(10, 100) = 20 (not affected by segment size)
assert_eq!(segment_collector.topn_capacity(), 20);

Ok(())
}
}

#[cfg(all(test, feature = "unstable"))]
Expand Down
9 changes: 8 additions & 1 deletion src/collector/top_score_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,8 @@ impl Collector for TopDocs {
reader: &SegmentReader,
) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
let heap_len = self.0.limit + self.0.offset;
let mut top_n: TopNComputer<_, _> = TopNComputer::new(heap_len);
let capped_heap_len = heap_len.min(reader.num_docs() as usize);
let mut top_n: TopNComputer<_, _> = TopNComputer::new(capped_heap_len);

if let Some(alive_bitset) = reader.alive_bitset() {
let mut threshold = Score::MIN;
Expand Down Expand Up @@ -1046,6 +1047,11 @@ where
}
self.buffer
}

#[cfg(test)]
pub(crate) fn buffer_capacity(&self) -> usize {
self.buffer.capacity()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1993,4 +1999,5 @@ mod tests {
]
);
}

}
Loading