From f23cbc1227ab5c6f1b79899dc3c70403446052a9 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Mon, 15 Jun 2026 19:45:48 +0300 Subject: [PATCH] feat: add random search downsampling --- api/seqproxyapi/v1/seq_proxy_api.proto | 19 +- api/storeapi/store_api.proto | 1 + frac/fraction_test.go | 216 ++++++++++++++++++ frac/processor/search.go | 103 ++++++--- frac/processor/search_params.go | 2 + pkg/seqproxyapi/v1/seq_proxy_api.pb.go | 28 ++- .../v1/seq_proxy_api_vtproto.pb.go | 110 +++++++++ pkg/storeapi/store_api.pb.go | 16 +- pkg/storeapi/store_api_vtproto.pb.go | 55 +++++ proxy/search/search_request.go | 24 +- proxyapi/grpc_search.go | 13 +- proxyapi/grpc_v1.go | 1 + storeapi/grpc_search.go | 1 + tests/integration_tests/integration_test.go | 79 +++++++ tests/setup/env.go | 6 + 15 files changed, 610 insertions(+), 64 deletions(-) diff --git a/api/seqproxyapi/v1/seq_proxy_api.proto b/api/seqproxyapi/v1/seq_proxy_api.proto index 22ca2f1da..6da046859 100644 --- a/api/seqproxyapi/v1/seq_proxy_api.proto +++ b/api/seqproxyapi/v1/seq_proxy_api.proto @@ -6,6 +6,7 @@ package seqproxyapi.v1; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; + import "google/api/annotations.proto"; // seq-db public api. Exposes APIs related to document querying. @@ -216,17 +217,19 @@ message SearchRequest { bool with_total = 4; // Should total number of documents be returned in response. Order order = 5; // Document order ORDER_DESC/ORDER_ASC. string offset_id = 6; // ID offset for pagination. + uint32 downsample = 7; // If set, returns roughly 1 in N documents on a probabilistic basis } message ComplexSearchRequest { - SearchQuery query = 1; // Search query. - repeated AggQuery aggs = 2; // List of aggregation queries. - optional HistQuery hist = 3; // Histogram query. - int64 size = 4; // Maximum number of documents to return. - int64 offset = 5; // Search offset. - bool with_total = 6; // Should total number of documents be returned in response. - Order order = 7; // Document order ORDER_DESC/ORDER_ASC. - string offset_id = 8; // ID offset for pagination. + SearchQuery query = 1; // Search query. + repeated AggQuery aggs = 2; // List of aggregation queries. + optional HistQuery hist = 3; // Histogram query. + int64 size = 4; // Maximum number of documents to return. + int64 offset = 5; // Search offset. + bool with_total = 6; // Should total number of documents be returned in response. + Order order = 7; // Document order ORDER_DESC/ORDER_ASC. + string offset_id = 8; // ID offset for pagination. + uint32 downsample = 9; // If set, returns roughly 1 in N documents on a probabilistic basis } message SearchResponse { diff --git a/api/storeapi/store_api.proto b/api/storeapi/store_api.proto index 43a343b81..d3b2e2b42 100644 --- a/api/storeapi/store_api.proto +++ b/api/storeapi/store_api.proto @@ -77,6 +77,7 @@ message SearchRequest { repeated AggQuery aggs = 12; Order order = 13; string offset_id = 14; + uint32 downsample = 15; // If set, returns roughly 1 in N documents on a probabilistic basis } message SearchResponse { diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 244aeb99f..35e02713d 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1912,6 +1912,215 @@ func (s *FractionTestSuite) TestFractionInfo() { } } +func (s *FractionTestSuite) TestSearchDownsample() { + const ( + totalDocs = 5000 + bulkSize = 200 + queryAll = "message:*" + queryFiltered = "message:started" + tolerancePct = 3 // ±3% tolerance due to probabilistic sampling + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + baseOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + } + + // Step 1: verify that all documents are indexed and searchable + allResult, err := s.fraction.Search(context.Background(), *s.query(queryAll, baseOpts...)) + s.Require().NoError(err, "search for all documents should succeed") + s.Require().Equal(totalDocs, allResult.IDs.Len(), + "all %d documents should be found without downsample", totalDocs) + + // Step 2: find how many documents match the filtered query (message:started) + // This count serves as the baseline for downsample expectations. + filteredResult, err := s.fraction.Search(context.Background(), *s.query(queryFiltered, baseOpts...)) + s.Require().NoError(err, "search for filtered documents should succeed") + filteredDocCount := filteredResult.IDs.Len() + s.Require().Greater(filteredDocCount, 0, "at least one document should match %q", queryFiltered) + + // Step 3: verify downsample produces approximately expected document counts + // With downsample=k, each document has a 1/k probability of being included, + // so we expect approximately total/k documents with ±tolerancePercent tolerance. + tolerance := filteredDocCount * tolerancePct / 100 + downsampleValues := []int{10, 20, 50, 100} + for _, ds := range downsampleValues { + s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { + query := s.query(queryFiltered, append(baseOpts, withDownsample(uint32(ds)))...) + result, err := s.fraction.Search(context.Background(), *query) + s.Require().NoError(err, "search with downsample=%d should succeed", ds) + + sampledLen := result.IDs.Len() + expectedCount := filteredDocCount / ds + lowerBound := expectedCount - tolerance + upperBound := expectedCount + tolerance + + s.Require().GreaterOrEqual(sampledLen, lowerBound, + "downsample=%d: sampled count %d should be >= %d (expected %d - tolerance %d)", + ds, sampledLen, lowerBound, expectedCount, tolerance) + s.Require().LessOrEqual(sampledLen, upperBound, + "downsample=%d: sampled count %d should be <= %d (expected %d + tolerance %d)", + ds, sampledLen, upperBound, expectedCount, tolerance) + }) + } +} + +func (s *FractionTestSuite) TestSearchDownsampleWithTotal() { + const ( + totalDocs = 1000 + bulkSize = 200 + tolerancePct = 3 // percent tolerance for sampled document count + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + // tolerance window: ±3% of total documents + tolerance := totalDocs * tolerancePct / 100 + + // downsample values to test: each should return ~1/ds of total documents + downsampleValues := []int{10, 20, 50, 100} + + for _, ds := range downsampleValues { + ds := ds // capture loop variable + s.T().Run(fmt.Sprintf("downsample=%d", ds), func(t *testing.T) { + params := s.query( + "message:*", + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + withDownsample(uint32(ds)), + withTotal(), + ) + result, err := s.fraction.Search(context.Background(), *params) + s.Require().NoError(err, "search with downsample=%d failed", ds) + + sampledDocs := result.IDs.Len() + expectedDocs := totalDocs / ds // with downsample=k, expect approximately totalDocs/k documents + + s.Require().Less(sampledDocs, expectedDocs+tolerance, + "downsample=%d: sampled docs (%d) should be less than expected (%d) + tolerance (%d)", + ds, sampledDocs, expectedDocs, tolerance) + s.Require().Greater(sampledDocs, expectedDocs-tolerance, + "downsample=%d: sampled docs (%d) should be greater than expected (%d) - tolerance (%d)", + ds, sampledDocs, expectedDocs, tolerance) + + // Total field must always reflect the full document count, regardless of downsample + s.Require().Equal(totalDocs, int(result.Total), + "downsample=%d: total should not be affected by downsample", ds) + }) + } +} + +func (s *FractionTestSuite) TestSearchDownsampleWithAggAndHist() { + const ( + totalDocs = 1000 + bulkSize = 200 + hist = 8 + downsample = 20 + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + commonOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + withHist(uint64(hist)), + withAggQuery(processor.AggQuery{ + GroupBy: aggField("service"), + Func: seq.AggFuncCount, + }), + } + + s.T().Run("without downsample", func(t *testing.T) { + paramsNoDS := s.query("message:started", commonOpts...) + qprNoDS, err := s.fraction.Search(context.Background(), *paramsNoDS) + s.Require().NoError(err, "search without downsample failed") + s.Require().NotNil(qprNoDS, "search result must not be nil") + s.Require().Greater(len(qprNoDS.Aggs), 0, "should have aggregation results") + + // Verify the histogram has a reasonable number of buckets. + s.Require().Greater(len(qprNoDS.Histogram), 0, "histogram should have at least one bucket") + s.Require().LessOrEqual(len(qprNoDS.Histogram), totalDocs/hist, + "histogram buckets (%d) should not exceed cntDocs/hist=%d", + len(qprNoDS.Histogram), totalDocs/hist) + + s.T().Run("with downsample", func(t *testing.T) { + paramsDS := s.query("message:started", append(commonOpts, withDownsample(downsample))...) + qprDS, err := s.fraction.Search(context.Background(), *paramsDS) + s.Require().NoError(err, "search with downsample=%d failed", downsample) + s.Require().NotNil(qprDS, "search result must not be nil") + s.Require().Equal(qprNoDS.Histogram, qprDS.Histogram, "histogram should match without downsample") + assertAggregationsEqual(s, qprNoDS, qprDS) + }) + }) +} + +// assertAggregationsEqual verifies that two search results have identical aggregation data. +func assertAggregationsEqual(s *FractionTestSuite, expected, actual *seq.QPR) { + s.Require().Equal(len(expected.Aggs), len(actual.Aggs), + "number of aggregation groups should be the same; "+ + "aggregations are computed on the full document set and are not affected by downsample") + + for i := range expected.Aggs { + expAgg := &expected.Aggs[i] + actAgg := &actual.Aggs[i] + + s.Require().Equal(len(expAgg.SamplesByBin), len(actAgg.SamplesByBin), + "number of aggregation bins should be the same for agg group %d", i) + + for bin, expSample := range expAgg.SamplesByBin { + actSample, ok := actAgg.SamplesByBin[bin] + s.Require().True(ok, "bin %v should exist in downsample results for agg group %d", bin, i) + // Total count is computed from the full document set and must match exactly. + s.Require().Equal(expSample.Total, actSample.Total, + "aggregation total for bin %v in agg group %d should be the same "+ + "(aggregations are computed on the full document set, not sampled)", bin, i) + } + } +} + +func (s *FractionTestSuite) TestSearchDownsampleZeroAndOne() { + const ( + totalDocs = 5000 + bulkSize = 200 + queryAll = "message:*" + ) + + _, bulks, fromTime, toTime := generatesMessages(totalDocs, bulkSize) + s.insertDocuments(bulks...) + + baseOpts := []searchOption{ + withFrom(fromTime.Format(time.RFC3339Nano)), + withTo(toTime.Format(time.RFC3339Nano)), + } + + // searchAndAssertIDs is a local helper that runs a search with the given options + // and asserts that the result contains exactly totalDocs documents. + searchAndAssertIDs := func(name string, opts ...searchOption) { + s.T().Run(name, func(t *testing.T) { + params := s.query(queryAll, append(baseOpts, opts...)...) + qpr, err := s.fraction.Search(context.Background(), *params) + s.Require().NoError(err, "%s: search failed", name) + s.Require().NotNil(qpr, "%s: search result must not be nil", name) + s.Require().Equal(totalDocs, qpr.IDs.Len(), + "%s: expected %d documents, got %d", name, totalDocs, qpr.IDs.Len()) + }) + } + + // downsample=0 (default) — should return all documents + searchAndAssertIDs("downsample=0 (default)") + + // downsample=0 explicitly — should return all documents + searchAndAssertIDs("downsample=0 (explicit)", withDownsample(0)) + + // downsample=1 — should return all documents + searchAndAssertIDs("downsample=1", withDownsample(1)) +} + type searchOption func(*processor.SearchParams) error func (s *FractionTestSuite) query(queryString string, options ...searchOption) *processor.SearchParams { @@ -1976,6 +2185,13 @@ func withHist(histInterval uint64) searchOption { } } +func withDownsample(k uint32) searchOption { + return func(sp *processor.SearchParams) error { + sp.Downsample = k + return nil + } +} + func aggField(field string) *parser.Literal { searchAll := []parser.Term{{ Kind: parser.TermSymbol, Data: "*", diff --git a/frac/processor/search.go b/frac/processor/search.go index 30ed4f45e..8734bcc7f 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math" + "math/rand/v2" "sync" "time" @@ -43,9 +44,10 @@ type searchIndex interface { } type searchBuffers struct { - lids []node.LID - mids []seq.MID - rids []seq.RID + mids []seq.MID + rids []seq.RID + lids []node.LID + sampled []node.LID } var searchBuffersPool = sync.Pool{ @@ -222,8 +224,10 @@ func iterateEvalTree( buffers := searchBuffersPool.Get().(*searchBuffers) defer searchBuffersPool.Put(buffers) + mids := buffers.mids rids := buffers.rids + sampled := buffers.sampled batchedEvalTree := batcher(evalTree, buffers.lids) @@ -244,7 +248,10 @@ func iterateEvalTree( break } needLIDs := params.Limit - len(ids) - if needScanAllRange { + + // if full range scan is required OR downsampling is active, + // we must fetch as many LIDs as possible in one batch. + if needScanAllRange || params.Downsample > 1 { needLIDs = math.MaxUint32 } @@ -256,43 +263,48 @@ func iterateEvalTree( break } - needMIDs := min(params.Limit-len(ids), len(lidsSlice)) - if hasHist { - // need to fetch mids for all lids for hist - needMIDs = len(lidsSlice) - } + mids = mids[:0] - // Get MIDs - if needMIDs > 0 { + if hasHist { + // when histogram is needed, retrieve MIDs for ALL LIDs in the batch timerMID.Start() - mids = idsIndex.GetMIDs(lidsSlice[:needMIDs], mids[:0]) + mids = idsIndex.GetMIDs(lidsSlice, mids) timerMID.Stop() - } - // Get RIDs - // compute number of ids we can get here, since some MIDs might have been filtered out - needIDs := min(params.Limit-len(ids), len(lidsSlice)) - if needIDs > 0 { - timerRID.Start() - rids = idsIndex.GetRIDs(lidsSlice[0:needIDs], rids[:0]) - timerRID.Stop() + timerUpdateHist.Start() + hist.Update(mids) + timerUpdateHist.Stop() + + sampled, mids = sampleLIDsWithMIDs(lidsSlice, mids, sampled[:0], params.Downsample) + } else { + sampled = sampleLIDs(lidsSlice, sampled[:0], params.Downsample) } - // Fill IDs for search - for i := 0; i < needIDs; i++ { - id := seq.ID{MID: mids[i], RID: rids[i]} + // trim sampled slice to respect the remaining limit + if params.Limit-len(ids) < len(sampled) { + sampled = sampled[:params.Limit-len(ids)] + } - if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one - ids = append(ids, seq.IDSource{ID: id}) + if len(sampled) > 0 { + if len(mids) == 0 { + // if we don't already have MIDs (i.e., no histogram case), fetch them now + timerMID.Start() + mids = idsIndex.GetMIDs(sampled, mids) + timerMID.Stop() } - lastID = id - } - // Update hist map - if hasHist { - timerUpdateHist.Start() - hist.Update(mids) - timerUpdateHist.Stop() + timerRID.Start() + rids = idsIndex.GetRIDs(sampled, rids[:0]) + timerRID.Stop() + + // Fill IDs for search + for i := range sampled { + id := seq.ID{MID: mids[i], RID: rids[i]} + if i == 0 || lastID != id { // lids increase monotonically, it's enough to compare current id with the last one + ids = append(ids, seq.IDSource{ID: id}) + } + lastID = id + } } // Update aggregators @@ -323,6 +335,33 @@ func iterateEvalTree( return total, ids, hist, aggs, nil } +func sampleLIDs(in, out []node.LID, k uint32) []node.LID { + if k <= 1 { // 0 or 1 -> no sample + return in + } + for _, lid := range in { + if rand.N(k) == 0 { + out = append(out, lid) + } + } + return out +} + +func sampleLIDsWithMIDs(in []node.LID, mids []seq.MID, out []node.LID, k uint32) ([]node.LID, []seq.MID) { + if k <= 1 { // 0 or 1 -> no sample + return in, mids + } + i := 0 + for j, lid := range in { + if rand.N(k) == 0 { + out = append(out, lid) + mids[i] = mids[j] + i++ + } + } + return out, mids[:i] +} + func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) { switch it := evalTree.(type) { case *lids.IteratorDesc: diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index de2594b86..4d7279d78 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -47,6 +47,8 @@ type SearchParams struct { WithTotal bool Order seq.DocsOrder + + Downsample uint32 } func (p SearchParams) MarshalLogObject(enc zapcore.ObjectEncoder) error { diff --git a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go index eac9fd1cc..29a70a723 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.5 -// protoc v5.29.3 +// protoc v7.35.0 // source: seqproxyapi/v1/seq_proxy_api.proto package seqproxyapi @@ -719,6 +719,7 @@ type SearchRequest struct { WithTotal bool `protobuf:"varint,4,opt,name=with_total,json=withTotal,proto3" json:"with_total,omitempty"` // Should total number of documents be returned in response. Order Order `protobuf:"varint,5,opt,name=order,proto3,enum=seqproxyapi.v1.Order" json:"order,omitempty"` // Document order ORDER_DESC/ORDER_ASC. OffsetId string `protobuf:"bytes,6,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` // ID offset for pagination. + Downsample uint32 `protobuf:"varint,7,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -795,6 +796,13 @@ func (x *SearchRequest) GetOffsetId() string { return "" } +func (x *SearchRequest) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + type ComplexSearchRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Query *SearchQuery `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` // Search query. @@ -805,6 +813,7 @@ type ComplexSearchRequest struct { WithTotal bool `protobuf:"varint,6,opt,name=with_total,json=withTotal,proto3" json:"with_total,omitempty"` // Should total number of documents be returned in response. Order Order `protobuf:"varint,7,opt,name=order,proto3,enum=seqproxyapi.v1.Order" json:"order,omitempty"` // Document order ORDER_DESC/ORDER_ASC. OffsetId string `protobuf:"bytes,8,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` // ID offset for pagination. + Downsample uint32 `protobuf:"varint,9,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -895,6 +904,13 @@ func (x *ComplexSearchRequest) GetOffsetId() string { return "" } +func (x *ComplexSearchRequest) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + type SearchResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Deprecated: Marked as deprecated in seqproxyapi/v1/seq_proxy_api.proto. @@ -2724,7 +2740,7 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x6e, 0x12, 0x38, 0x0a, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6c, 0x61, 0x69, 0x6e, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x22, 0xd7, 0x01, 0x0a, 0x0d, + 0x79, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x22, 0xf7, 0x01, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, @@ -2738,7 +2754,9 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x66, 0x66, - 0x73, 0x65, 0x74, 0x49, 0x64, 0x22, 0xc9, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, + 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, 0x61, 0x6d, + 0x70, 0x6c, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, + 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xe9, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x78, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x53, @@ -2758,7 +2776,9 @@ var file_seqproxyapi_v1_seq_proxy_api_proto_rawDesc = string([]byte{ 0x73, 0x65, 0x71, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x68, 0x69, 0x73, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, + 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, + 0x77, 0x6e, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x68, 0x69, 0x73, 0x74, 0x22, 0xb0, 0x01, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x42, 0x02, diff --git a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go index 97f47d659..b45009b1a 100644 --- a/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go +++ b/pkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.go @@ -262,6 +262,7 @@ func (m *SearchRequest) CloneVT() *SearchRequest { r.WithTotal = m.WithTotal r.Order = m.Order r.OffsetId = m.OffsetId + r.Downsample = m.Downsample if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -285,6 +286,7 @@ func (m *ComplexSearchRequest) CloneVT() *ComplexSearchRequest { r.WithTotal = m.WithTotal r.Order = m.Order r.OffsetId = m.OffsetId + r.Downsample = m.Downsample if rhs := m.Aggs; rhs != nil { tmpContainer := make([]*AggQuery, len(rhs)) for k, v := range rhs { @@ -1206,6 +1208,9 @@ func (this *SearchRequest) EqualVT(that *SearchRequest) bool { if this.OffsetId != that.OffsetId { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1260,6 +1265,9 @@ func (this *ComplexSearchRequest) EqualVT(that *ComplexSearchRequest) bool { if this.OffsetId != that.OffsetId { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -3256,6 +3264,11 @@ func (m *SearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x38 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -3331,6 +3344,11 @@ func (m *ComplexSearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x48 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5524,6 +5542,11 @@ func (m *SearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x38 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5599,6 +5622,11 @@ func (m *ComplexSearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x48 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -7440,6 +7468,9 @@ func (m *SearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -7480,6 +7511,9 @@ func (m *ComplexSearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -9613,6 +9647,25 @@ func (m *SearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -9879,6 +9932,25 @@ func (m *ComplexSearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -15099,6 +15171,25 @@ func (m *SearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -15369,6 +15460,25 @@ func (m *ComplexSearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/storeapi/store_api.pb.go b/pkg/storeapi/store_api.pb.go index 53e881042..e5b74bbb6 100644 --- a/pkg/storeapi/store_api.pb.go +++ b/pkg/storeapi/store_api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.5 -// protoc v5.29.3 +// protoc v7.35.0 // source: storeapi/store_api.proto package storeapi @@ -447,6 +447,7 @@ type SearchRequest struct { Aggs []*AggQuery `protobuf:"bytes,12,rep,name=aggs,proto3" json:"aggs,omitempty"` Order Order `protobuf:"varint,13,opt,name=order,proto3,enum=api.Order" json:"order,omitempty"` OffsetId string `protobuf:"bytes,14,opt,name=offset_id,json=offsetId,proto3" json:"offset_id,omitempty"` + Downsample uint32 `protobuf:"varint,15,opt,name=downsample,proto3" json:"downsample,omitempty"` // If set, returns roughly 1 in N documents, statistically. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -574,6 +575,13 @@ func (x *SearchRequest) GetOffsetId() string { return "" } +func (x *SearchRequest) GetDownsample() uint32 { + if x != nil { + return x.Downsample + } + return 0 +} + type SearchResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // Deprecated: Marked as deprecated in storeapi/store_api.proto. @@ -2172,7 +2180,7 @@ var file_storeapi_store_api_proto_rawDesc = string([]byte{ 0x0a, 0x09, 0x71, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x01, 0x52, 0x09, 0x71, 0x75, 0x61, 0x6e, 0x74, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, - 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0x85, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0xa5, 0x03, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x02, @@ -2197,7 +2205,9 @@ var file_storeapi_store_api_proto_rawDesc = string([]byte{ 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x05, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x66, - 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x22, 0xe6, 0x09, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, + 0x66, 0x73, 0x65, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x6f, 0x77, 0x6e, 0x73, 0x61, + 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x64, 0x6f, 0x77, 0x6e, + 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0xe6, 0x09, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x02, 0x18, 0x01, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3d, 0x0a, 0x0a, 0x69, 0x64, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, diff --git a/pkg/storeapi/store_api_vtproto.pb.go b/pkg/storeapi/store_api_vtproto.pb.go index 73a4cce31..fe3d5dc18 100644 --- a/pkg/storeapi/store_api_vtproto.pb.go +++ b/pkg/storeapi/store_api_vtproto.pb.go @@ -121,6 +121,7 @@ func (m *SearchRequest) CloneVT() *SearchRequest { r.AggregationFilter = m.AggregationFilter r.Order = m.Order r.OffsetId = m.OffsetId + r.Downsample = m.Downsample if rhs := m.Aggs; rhs != nil { tmpContainer := make([]*AggQuery, len(rhs)) for k, v := range rhs { @@ -845,6 +846,9 @@ func (this *SearchRequest) EqualVT(that *SearchRequest) bool { if this.OffsetId != that.OffsetId { return false } + if this.Downsample != that.Downsample { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -2296,6 +2300,11 @@ func (m *SearchRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x78 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -4108,6 +4117,11 @@ func (m *SearchRequest) MarshalToSizedBufferVTStrict(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Downsample != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.Downsample)) + i-- + dAtA[i] = 0x78 + } if len(m.OffsetId) > 0 { i -= len(m.OffsetId) copy(dAtA[i:], m.OffsetId) @@ -5845,6 +5859,9 @@ func (m *SearchRequest) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.Downsample != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.Downsample)) + } n += len(m.unknownFields) return n } @@ -7229,6 +7246,25 @@ func (m *SearchRequest) UnmarshalVT(dAtA []byte) error { } m.OffsetId = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -12056,6 +12092,25 @@ func (m *SearchRequest) UnmarshalVTUnsafe(dAtA []byte) error { } m.OffsetId = stringValue iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Downsample", wireType) + } + m.Downsample = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Downsample |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/proxy/search/search_request.go b/proxy/search/search_request.go index 7aa468e24..5d6ba3fb3 100644 --- a/proxy/search/search_request.go +++ b/proxy/search/search_request.go @@ -28,21 +28,23 @@ type SearchRequest struct { WithTotal bool ShouldFetch bool Order seq.DocsOrder + Downsample uint32 } func (sr *SearchRequest) GetAPISearchRequest() *storeapi.SearchRequest { return &storeapi.SearchRequest{ - Query: util.ByteToStringUnsafe(sr.Q), - From: int64(seq.MIDToMillis(sr.From)), - To: int64(seq.MIDToMillis(sr.To)), - Size: int64(sr.Size), - Offset: int64(sr.Offset), - Interval: int64(seq.MIDToMillis(sr.Interval)), - OffsetId: sr.OffsetId, - Aggs: convertToAggsQuery(sr.AggQ), - Explain: sr.Explain, - WithTotal: sr.WithTotal, - Order: storeapi.MustProtoOrder(sr.Order), + Query: util.ByteToStringUnsafe(sr.Q), + From: int64(seq.MIDToMillis(sr.From)), + To: int64(seq.MIDToMillis(sr.To)), + Size: int64(sr.Size), + Offset: int64(sr.Offset), + Interval: int64(seq.MIDToMillis(sr.Interval)), + OffsetId: sr.OffsetId, + Aggs: convertToAggsQuery(sr.AggQ), + Explain: sr.Explain, + WithTotal: sr.WithTotal, + Order: storeapi.MustProtoOrder(sr.Order), + Downsample: sr.Downsample, } } diff --git a/proxyapi/grpc_search.go b/proxyapi/grpc_search.go index d5e658073..59c172343 100644 --- a/proxyapi/grpc_search.go +++ b/proxyapi/grpc_search.go @@ -20,12 +20,13 @@ func (g *grpcV1) Search( } proxyReq := &seqproxyapi.ComplexSearchRequest{ - Query: req.Query, - Size: req.Size, - Offset: req.Offset, - OffsetId: req.OffsetId, - WithTotal: req.WithTotal, - Order: req.Order, + Query: req.Query, + Size: req.Size, + Offset: req.Offset, + OffsetId: req.OffsetId, + WithTotal: req.WithTotal, + Order: req.Order, + Downsample: req.Downsample, } sResp, err := g.doSearch(ctx, proxyReq, true, nil) if err != nil { diff --git a/proxyapi/grpc_v1.go b/proxyapi/grpc_v1.go index 90611a293..fb14c14e9 100644 --- a/proxyapi/grpc_v1.go +++ b/proxyapi/grpc_v1.go @@ -242,6 +242,7 @@ func (g *grpcV1) doSearch( WithTotal: req.WithTotal, ShouldFetch: shouldFetch, Order: req.Order.MustDocsOrder(), + Downsample: req.Downsample, } if len(req.Aggs) > 0 { diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index e5eedd980..388312802 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -186,6 +186,7 @@ func (g *GrpcV1) doSearch( WithTotal: req.WithTotal, Order: req.Order.MustDocsOrder(), OffsetId: offsetId, + Downsample: req.Downsample, } searchTr := tr.NewChild("search iteratively") diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index d2abf8185..67efa3252 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -1316,6 +1316,85 @@ func TestBigWithReplicasIntegration(t *testing.T) { suite.Run(t, dd) } +func (s *IntegrationTestSuite) TestDownsamplePropagation() { + t := s.T() + r := require.New(t) + + env := setup.NewTestingEnv(s.Config) + defer env.StopAll() + + const ( + docsPerBulk = 200 + tolerancePercent = 0.5 + ) + + // Setup data with status field for aggregations. + bulksNum := getBulkIterationsNum(env) + totalDocs := docsPerBulk * bulksNum + + origDocs := make([]string, docsPerBulk) + for j := 0; j < bulksNum; j++ { + baseIdx := j * docsPerBulk + for i := range origDocs { + origDocs[i] = fmt.Sprintf(`{"service":"a","id":%d,"status":%d}`, baseIdx+i, i%3) + } + setup.Bulk(t, env.IngestorBulkAddr(), origDocs) + } + + type testCase struct { + name string + downsample *uint32 // nil = option not passed + wantAll bool // expect all documents returned + } + + cases := []testCase{{ + name: "no downsample option", + downsample: nil, + wantAll: true, + }, { + name: "downsample=0", + downsample: ptr[uint32](0), + wantAll: true, + }, { + name: "downsample=1", + downsample: ptr[uint32](1), + wantAll: true, + }, { + name: "downsample=10", + downsample: ptr[uint32](10), + wantAll: false, + }} + + for _, tc := range cases { + opts := []setup.SearchOption{setup.NoFetch(), setup.WithTotal(true)} + if tc.downsample != nil { + opts = append(opts, setup.WithDownsample(*tc.downsample)) + } + + qpr, _, _, err := env.Search(`service:a`, math.MaxInt32, opts...) + r.NoError(err, "store search with %s should succeed", tc.name) + + if tc.wantAll { + r.Equal(totalDocs, len(qpr.IDs), "store search %s: should return all %d docs", tc.name, totalDocs) + } else { + r.Greater(len(qpr.IDs), 0, "store search %s: should return at least some results", tc.name) + // downsample=N: expect approximately total/N docs with ±3% tolerance. + ds := int(*tc.downsample) + delta := float64(totalDocs/ds) * tolerancePercent + r.InDelta(totalDocs/ds, len(qpr.IDs), delta, + "store search %s: should return ~%d docs", tc.name, totalDocs/ds) + } + + r.Equal(uint64(totalDocs), qpr.Total, + "store search %s: Total should reflect full count (%d)", tc.name, totalDocs) + } +} + +// ptr returns a pointer to the given value. +func ptr[T any](v T) *T { + return &v +} + func (s *IntegrationTestSuite) TestDocuments() { n := 32 env, origDocs := s.envWithDummyDocs(n) diff --git a/tests/setup/env.go b/tests/setup/env.go index a42ba006e..73b4eea20 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -573,6 +573,12 @@ func WithOrder(o seq.DocsOrder) SearchOption { } } +func WithDownsample(downsample uint32) SearchOption { + return func(sr *search.SearchRequest) { + sr.Downsample = downsample + } +} + func (t *TestingEnv) Search(q string, size int, options ...SearchOption) (*seq.QPR, [][]byte, time.Duration, error) { sr := &search.SearchRequest{ Explain: false,