Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package seqproxyapi.v1;

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

import "google/api/annotations.proto";

// seq-db public api. Exposes APIs related to document querying.
Expand Down Expand Up @@ -216,17 +217,19 @@ message SearchRequest {
bool with_total = 4; // Should total number of documents be returned in response.
Order order = 5; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 6; // ID offset for pagination.
uint32 downsample = 7; // If set, returns roughly 1 in N documents on a probabilistic basis
}

message ComplexSearchRequest {
SearchQuery query = 1; // Search query.
repeated AggQuery aggs = 2; // List of aggregation queries.
optional HistQuery hist = 3; // Histogram query.
int64 size = 4; // Maximum number of documents to return.
int64 offset = 5; // Search offset.
bool with_total = 6; // Should total number of documents be returned in response.
Order order = 7; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 8; // ID offset for pagination.
SearchQuery query = 1; // Search query.
repeated AggQuery aggs = 2; // List of aggregation queries.
optional HistQuery hist = 3; // Histogram query.
int64 size = 4; // Maximum number of documents to return.
int64 offset = 5; // Search offset.
bool with_total = 6; // Should total number of documents be returned in response.
Order order = 7; // Document order ORDER_DESC/ORDER_ASC.
string offset_id = 8; // ID offset for pagination.
uint32 downsample = 9; // If set, returns roughly 1 in N documents on a probabilistic basis
}

message SearchResponse {
Expand Down
1 change: 1 addition & 0 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ message SearchRequest {
repeated AggQuery aggs = 12;
Order order = 13;
string offset_id = 14;
uint32 downsample = 15; // If set, returns roughly 1 in N documents on a probabilistic basis
}

message SearchResponse {
Expand Down
216 changes: 216 additions & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,215 @@ func (s *FractionTestSuite) TestFractionInfo() {
}
}

func (s *FractionTestSuite) TestSearchDownsample() {
const (
totalDocs = 5000
bulkSize = 200
queryAll = "message:*"
queryFiltered = "message:started"
tolerancePct = 3 // ±3% tolerance due to probabilistic sampling
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

baseOpts := []searchOption{
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
}

// Step 1: verify that all documents are indexed and searchable
allResult, err := s.fraction.Search(context.Background(), *s.query(queryAll, baseOpts...))
s.Require().NoError(err, "search for all documents should succeed")
s.Require().Equal(totalDocs, allResult.IDs.Len(),
"all %d documents should be found without downsample", totalDocs)

// Step 2: find how many documents match the filtered query (message:started)
// This count serves as the baseline for downsample expectations.
filteredResult, err := s.fraction.Search(context.Background(), *s.query(queryFiltered, baseOpts...))
s.Require().NoError(err, "search for filtered documents should succeed")
filteredDocCount := filteredResult.IDs.Len()
s.Require().Greater(filteredDocCount, 0, "at least one document should match %q", queryFiltered)

// Step 3: verify downsample produces approximately expected document counts
// With downsample=k, each document has a 1/k probability of being included,
// so we expect approximately total/k documents with ±tolerancePercent tolerance.
tolerance := filteredDocCount * tolerancePct / 100
downsampleValues := []int{10, 20, 50, 100}
for _, ds := range downsampleValues {
s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) {
query := s.query(queryFiltered, append(baseOpts, withDownsample(uint32(ds)))...)
result, err := s.fraction.Search(context.Background(), *query)
s.Require().NoError(err, "search with downsample=%d should succeed", ds)

sampledLen := result.IDs.Len()
expectedCount := filteredDocCount / ds
lowerBound := expectedCount - tolerance
upperBound := expectedCount + tolerance

s.Require().GreaterOrEqual(sampledLen, lowerBound,
"downsample=%d: sampled count %d should be >= %d (expected %d - tolerance %d)",
ds, sampledLen, lowerBound, expectedCount, tolerance)
s.Require().LessOrEqual(sampledLen, upperBound,
"downsample=%d: sampled count %d should be <= %d (expected %d + tolerance %d)",
ds, sampledLen, upperBound, expectedCount, tolerance)
})
}
}

func (s *FractionTestSuite) TestSearchDownsampleWithTotal() {
const (
totalDocs = 1000
bulkSize = 200
tolerancePct = 3 // percent tolerance for sampled document count
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

// tolerance window: ±3% of total documents
tolerance := totalDocs * tolerancePct / 100

// downsample values to test: each should return ~1/ds of total documents
downsampleValues := []int{10, 20, 50, 100}

for _, ds := range downsampleValues {
ds := ds // capture loop variable
s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) {
params := s.query(
"message:*",
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
withDownsample(uint32(ds)),
withTotal(),
)
result, err := s.fraction.Search(context.Background(), *params)
s.Require().NoError(err, "search with downsample=%d failed", ds)

sampledDocs := result.IDs.Len()
expectedDocs := totalDocs / ds // with downsample=k, expect approximately totalDocs/k documents

s.Require().Less(sampledDocs, expectedDocs+tolerance,
"downsample=%d: sampled docs (%d) should be less than expected (%d) + tolerance (%d)",
ds, sampledDocs, expectedDocs, tolerance)
s.Require().Greater(sampledDocs, expectedDocs-tolerance,
"downsample=%d: sampled docs (%d) should be greater than expected (%d) - tolerance (%d)",
ds, sampledDocs, expectedDocs, tolerance)

// Total field must always reflect the full document count, regardless of downsample
s.Require().Equal(totalDocs, int(result.Total),
"downsample=%d: total should not be affected by downsample", ds)
})
}
}

func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() {
const (
totalDocs = 1000
bulkSize = 200
hist = 8
downsample = 20
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

commonOpts := []searchOption{
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
withHist(uint64(hist)),
withAggQuery(processor.AggQuery{
GroupBy: aggField("service"),
Func: seq.AggFuncCount,
}),
}

s.T().Run("without downsample", func(t *testing.T) {
paramsNoDS := s.query("message:started", commonOpts...)
qprNoDS, err := s.fraction.Search(context.Background(), *paramsNoDS)
s.Require().NoError(err, "search without downsample failed")
s.Require().NotNil(qprNoDS, "search result must not be nil")
s.Require().Greater(len(qprNoDS.Aggs), 0, "should have aggregation results")

// Verify the histogram has a reasonable number of buckets.
s.Require().Greater(len(qprNoDS.Histogram), 0, "histogram should have at least one bucket")
s.Require().LessOrEqual(len(qprNoDS.Histogram), totalDocs/hist,
"histogram buckets (%d) should not exceed cntDocs/hist=%d",
len(qprNoDS.Histogram), totalDocs/hist)

s.T().Run("with downsample", func(t *testing.T) {
paramsDS := s.query("message:started", append(commonOpts, withDownsample(downsample))...)
qprDS, err := s.fraction.Search(context.Background(), *paramsDS)
s.Require().NoError(err, "search with downsample=%d failed", downsample)
s.Require().NotNil(qprDS, "search result must not be nil")
s.Require().Equal(qprNoDS.Histogram, qprDS.Histogram, "histogram should match without downsample")
assertAggregationsEqual(s, qprNoDS, qprDS)
})
})
}

// assertAggregationsEqual verifies that two search results have identical aggregation data.
func assertAggregationsEqual(s *FractionTestSuite, expected, actual *seq.QPR) {
s.Require().Equal(len(expected.Aggs), len(actual.Aggs),
"number of aggregation groups should be the same; "+
"aggregations are computed on the full document set and are not affected by downsample")

for i := range expected.Aggs {
expAgg := &expected.Aggs[i]
actAgg := &actual.Aggs[i]

s.Require().Equal(len(expAgg.SamplesByBin), len(actAgg.SamplesByBin),
"number of aggregation bins should be the same for agg group %d", i)

for bin, expSample := range expAgg.SamplesByBin {
actSample, ok := actAgg.SamplesByBin[bin]
s.Require().True(ok, "bin %v should exist in downsample results for agg group %d", bin, i)
// Total count is computed from the full document set and must match exactly.
s.Require().Equal(expSample.Total, actSample.Total,
"aggregation total for bin %v in agg group %d should be the same "+
"(aggregations are computed on the full document set, not sampled)", bin, i)
}
}
}

func (s *FractionTestSuite) TestSearchDownsampleZeroAndOne() {
const (
totalDocs = 5000
bulkSize = 200
queryAll = "message:*"
)

_, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize)
s.insertDocuments(bulks...)

baseOpts := []searchOption{
withFrom(fromTime.Format(time.RFC3339Nano)),
withTo(toTime.Format(time.RFC3339Nano)),
}

// searchAndAssertIDs is a local helper that runs a search with the given options
// and asserts that the result contains exactly totalDocs documents.
searchAndAssertIDs := func(name string, opts ...searchOption) {
s.T().Run(name, func(t *testing.T) {
params := s.query(queryAll, append(baseOpts, opts...)...)
qpr, err := s.fraction.Search(context.Background(), *params)
s.Require().NoError(err, "%s: search failed", name)
s.Require().NotNil(qpr, "%s: search result must not be nil", name)
s.Require().Equal(totalDocs, qpr.IDs.Len(),
"%s: expected %d documents, got %d", name, totalDocs, qpr.IDs.Len())
})
}

// downsample=0 (default) — should return all documents
searchAndAssertIDs("downsample=0 (default)")

// downsample=0 explicitly — should return all documents
searchAndAssertIDs("downsample=0 (explicit)", withDownsample(0))

// downsample=1 — should return all documents
searchAndAssertIDs("downsample=1", withDownsample(1))
}

type searchOption func(*processor.SearchParams) error

func (s *FractionTestSuite) query(queryString string, options ...searchOption) *processor.SearchParams {
Expand Down Expand Up @@ -1976,6 +2185,13 @@ func withHist(histInterval uint64) searchOption {
}
}

func withDownsample(k uint32) searchOption {
return func(sp *processor.SearchParams) error {
sp.Downsample = k
return nil
}
}

func aggField(field string) *parser.Literal {
searchAll := []parser.Term{{
Kind: parser.TermSymbol, Data: "*",
Expand Down
Loading
Loading