Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/index/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl InvertedIndexReader {
if option.has_positions() {
let positions_data = self
.positions_file_slice
.read_bytes_slice(term_info.positions_range.clone())?;
let position_reader = PositionReader::open(positions_data)?;
.slice(term_info.positions_range.clone());
let position_reader = PositionReader::open_lazy(positions_data)?;
Some(position_reader)
} else {
None
Expand Down
167 changes: 147 additions & 20 deletions src/positions/reader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io;

use common::{BinarySerializable, VInt};
use common::{BinarySerializable, HasLen, VInt};

use crate::directory::OwnedBytes;
use crate::directory::{FileSlice, OwnedBytes};
use crate::positions::COMPRESSION_BLOCK_SIZE;
use crate::postings::compression::{BlockDecoder, VIntDecoder};

Expand All @@ -17,7 +17,13 @@ use crate::postings::compression::{BlockDecoder, VIntDecoder};
/// bytes.

#[derive(Clone)]
pub struct PositionReader {
pub enum PositionReader {
Eager(EagerPositionReader),
Lazy(LazyPositionReader),
}

#[derive(Clone)]
pub struct EagerPositionReader {
bit_widths: OwnedBytes,
positions: OwnedBytes,

Expand All @@ -43,27 +49,55 @@ impl PositionReader {
pub fn open(mut positions_data: OwnedBytes) -> io::Result<PositionReader> {
let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize;
let (bit_widths, positions) = positions_data.split(num_positions_bitpacked_blocks);
Ok(PositionReader {
Ok(PositionReader::Eager(EagerPositionReader {
bit_widths: bit_widths.clone(),
positions: positions.clone(),
block_decoder: BlockDecoder::default(),
block_offset: i64::MAX as u64,
anchor_offset: 0u64,
original_bit_widths: bit_widths,
original_positions: positions,
})
}))
}

pub fn open_lazy(positions_data: FileSlice) -> io::Result<PositionReader> {
let prefix_len = positions_data.len().min(10);
let mut prefix = positions_data.read_bytes_slice(0..prefix_len)?;
let prefix_before = prefix.len();
let num_positions_bitpacked_blocks = VInt::deserialize(&mut prefix)?.0 as usize;
let vint_len = prefix_before - prefix.len();
let bit_widths_end = vint_len + num_positions_bitpacked_blocks;

let mut bit_widths = positions_data.read_bytes_slice(0..bit_widths_end)?;
VInt::deserialize(&mut bit_widths)?;
let positions = positions_data.slice(bit_widths_end..positions_data.len());
Ok(PositionReader::Lazy(LazyPositionReader {
bit_widths: bit_widths.clone(),
positions,
block_decoder: BlockDecoder::default(),
block_offset: i64::MAX as u64,
anchor_offset: 0u64,
position_byte_anchor: 0,
original_bit_widths: bit_widths,
}))
}

pub fn read(&mut self, offset: u64, output: &mut [u32]) {
match self {
PositionReader::Eager(reader) => reader.read(offset, output),
PositionReader::Lazy(reader) => reader.read(offset, output),
}
}
}

impl EagerPositionReader {
fn reset(&mut self) {
self.positions = self.original_positions.clone();
self.bit_widths = self.original_bit_widths.clone();
self.block_offset = i64::MAX as u64;
self.anchor_offset = 0u64;
}

/// Advance from num_blocks bitpacked blocks.
///
/// Panics if there are not that many remaining blocks.
fn advance_num_blocks(&mut self, num_blocks: usize) {
let num_bits: usize = self.bit_widths.as_ref()[..num_blocks]
.iter()
Expand All @@ -76,9 +110,6 @@ impl PositionReader {
self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64;
}

/// block_rel_id is counted relatively to the anchor.
/// block_rel_id = 0 means the anchor block.
/// block_rel_id = i means the ith block after the anchor block.
fn load_block(&mut self, block_rel_id: usize) {
let bit_widths = self.bit_widths.as_slice();
let byte_offset: usize = bit_widths[0..block_rel_id]
Expand All @@ -89,21 +120,121 @@ impl PositionReader {
/ 8;
let compressed_data = &self.positions.as_slice()[byte_offset..];
if bit_widths.len() > block_rel_id {
// that block is bitpacked.
let bit_width = bit_widths[block_rel_id];
self.block_decoder
.uncompress_block_unsorted(compressed_data, bit_width, false);
} else {
// that block is vint encoded.
self.block_decoder
.uncompress_vint_unsorted_until_end(compressed_data);
}
self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64;
}

/// Fills a buffer with the positions `[offset..offset+output.len())` integers.
///
/// This function is optimized to be called with increasing values of `offset`.
pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
if offset < self.anchor_offset {
self.reset();
}
let delta_to_block_offset = offset as i64 - self.block_offset as i64;
if !(0..128).contains(&delta_to_block_offset) {
let delta_to_anchor_offset = offset - self.anchor_offset;
let num_blocks_to_skip =
(delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize;
self.advance_num_blocks(num_blocks_to_skip);
self.load_block(0);
} else {
let num_blocks_to_skip =
((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize;
self.advance_num_blocks(num_blocks_to_skip);
}

for i in 1.. {
let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
if remaining_in_block >= output.len() {
output.copy_from_slice(
&self.block_decoder.output_array()[offset_in_block..][..output.len()],
);
break;
}
output[..remaining_in_block]
.copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]);
output = &mut output[remaining_in_block..];
offset += remaining_in_block as u64;
self.load_block(i);
}
}
}

#[derive(Clone)]
pub struct LazyPositionReader {
bit_widths: OwnedBytes,
positions: FileSlice,

block_decoder: BlockDecoder,

block_offset: u64,
anchor_offset: u64,
position_byte_anchor: usize,

original_bit_widths: OwnedBytes,
}

impl LazyPositionReader {
fn reset(&mut self) {
self.bit_widths = self.original_bit_widths.clone();
self.block_offset = i64::MAX as u64;
self.anchor_offset = 0u64;
self.position_byte_anchor = 0;
}

fn advance_num_blocks(&mut self, num_blocks: usize) {
let num_bits: usize = self.bit_widths.as_ref()[..num_blocks]
.iter()
.cloned()
.map(|num_bits| num_bits as usize)
.sum();
let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8;
self.bit_widths.advance(num_blocks);
self.position_byte_anchor += num_bytes_to_skip;
self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64;
}

fn load_block(&mut self, block_rel_id: usize) {
let bit_widths = self.bit_widths.as_slice();
let byte_offset: usize = bit_widths[0..block_rel_id]
.iter()
.map(|&b| b as usize)
.sum::<usize>()
* COMPRESSION_BLOCK_SIZE
/ 8;
if bit_widths.len() > block_rel_id {
let bit_width = bit_widths[block_rel_id];
let start = self.position_byte_anchor + byte_offset;
let end = start + (bit_width as usize * COMPRESSION_BLOCK_SIZE / 8);
let compressed_data = if start == end {
OwnedBytes::new(Vec::new())
} else {
self.positions
.read_bytes_slice(start..end)
.expect("failed to lazy-read bitpacked positions")
};
self.block_decoder.uncompress_block_unsorted(
compressed_data.as_slice(),
bit_width,
false,
);
} else {
let start = self.position_byte_anchor + byte_offset;
let compressed_data = self
.positions
.read_bytes_slice(start..self.positions.len())
.expect("failed to lazy-read vint positions");
self.block_decoder
.uncompress_vint_unsorted_until_end(compressed_data.as_slice());
}
self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64;
}

pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
if offset < self.anchor_offset {
self.reset();
Expand All @@ -126,10 +257,7 @@ impl PositionReader {
self.advance_num_blocks(num_blocks_to_skip);
}

// At this point, the block containing offset is loaded, and anchor has
// been updated to point to it as well.
for i in 1.. {
// we copy the part from block i - 1 that is relevant.
let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
if remaining_in_block >= output.len() {
Expand All @@ -141,7 +269,6 @@ impl PositionReader {
output[..remaining_in_block]
.copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]);
output = &mut output[remaining_in_block..];
// we load block #i if necessary.
offset += remaining_in_block as u64;
self.load_block(i);
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub use self::fuzzy_query::FuzzyTermQuery;
pub use self::intersection::{intersect_scorers, Intersection};
pub use self::more_like_this::{MoreLikeThisQuery, MoreLikeThisQueryBuilder};
pub use self::phrase_prefix_query::PhrasePrefixQuery;
pub use self::phrase_query::PhraseQuery;
pub use self::phrase_query::{PhraseQuery, PhraseQueryStatsSnapshot};
pub use self::query::{EnableScoring, Query, QueryClone};
pub use self::query_parser::{QueryParser, QueryParserError};
pub use self::range_query::{FastFieldRangeWeight, IPFastFieldRangeWeight, RangeQuery};
Expand Down
3 changes: 2 additions & 1 deletion src/query/phrase_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ mod phrase_query;
mod phrase_scorer;
mod phrase_weight;

pub use self::phrase_query::PhraseQuery;
pub(crate) use self::phrase_query::PhraseQueryStats;
pub use self::phrase_query::{PhraseQuery, PhraseQueryStatsSnapshot};
pub(crate) use self::phrase_scorer::intersection_count;
pub use self::phrase_scorer::PhraseScorer;
pub use self::phrase_weight::PhraseWeight;
Expand Down
Loading