diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index b0a7c47e..de29ac4c 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -266,6 +266,7 @@ func startStore( SealParams: common.SealParams{ IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel, LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel, + LIDBlockSize: cfg.Sealing.Lids.BlockSize, TokenListZstdLevel: cfg.Compression.SealedZstdCompressionLevel, DocsPositionsZstdLevel: cfg.Compression.SealedZstdCompressionLevel, TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel, diff --git a/config/config.go b/config/config.go index 28a67dc0..0d929a7f 100644 --- a/config/config.go +++ b/config/config.go @@ -70,6 +70,13 @@ type Config struct { SealingQueueLen int `config:"sealing_queue_len" default:"10"` } `config:"storage"` + Sealing struct { + Lids struct { + // BlockSize sets max lids (postings) saved per LIDs block. + BlockSize int `config:"block_size" default:"65536"` + } `config:"lids"` + } `config:"sealing"` + Cluster struct { // WriteStores contains cold store instances which will be written to. WriteStores []string `config:"write_stores"` diff --git a/config/validation.go b/config/validation.go index 15d63c9b..c305a706 100644 --- a/config/validation.go +++ b/config/validation.go @@ -68,6 +68,8 @@ func (c *Config) storeValidations() []validateFn { inRange("compression.sealed_zstd_compression_level", -7, 22, c.Compression.SealedZstdCompressionLevel), inRange("compression.doc_block_zstd_compression_level", -7, 22, c.Compression.DocBlockZstdCompressionLevel), + greaterThan("sealing.lids.block_size", 0, c.Sealing.Lids.BlockSize), + lessOrEqThan("sealing.lids.block_size", 65536, c.Sealing.Lids.BlockSize), inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent), greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck), @@ -106,6 +108,18 @@ func greaterThan[T cmp.Ordered](field string, base, v T) validateFn { } } +func lessOrEqThan[T cmp.Ordered](field string, base, v T) validateFn { + return func() error { + if v > base { + return fmt.Errorf( + "field %q must be greater than %v", + field, base, + ) + } + return nil + } +} + func inRange[T cmp.Ordered](field string, from, to, v T) validateFn { return func() error { if v < from || to < v { diff --git a/config/validation_test.go b/config/validation_test.go index 0a29f990..b0813697 100644 --- a/config/validation_test.go +++ b/config/validation_test.go @@ -86,6 +86,18 @@ limits: env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"}, expectErr: false, }, + { + name: "Invalid sealing.lids.block_size", + cfg: baseCfg, + env: map[string]string{"SEQDB_SEALING_LIDS_BLOCK_SIZE": "-1"}, + expectErr: true, + }, + { + name: "Valid sealing.lids.block_size", + cfg: baseCfg, + env: map[string]string{"SEQDB_SEALING_LIDS_BLOCK_SIZE": "8192"}, + expectErr: false, + }, } for _, tt := range tests { diff --git a/consts/consts.go b/consts/consts.go index 80aabbe3..ccaba4e2 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -11,9 +11,9 @@ const ( // DummyMID is used in aggregations when we do not need to build time series. DummyMID = 0 - IDsPerBlock = int(4 * units.KiB) - LIDBlockCap = int(64 * units.KiB) - RegularBlockSize = int(16 * units.KiB) + IDsPerBlock = int(4 * units.KiB) + DefaultLIDBlockCap = int(64 * units.KiB) + RegularBlockSize = int(16 * units.KiB) DefaultMaintenanceDelay = time.Second DefaultCacheGCDelay = 1 * time.Second diff --git a/docs/en/02-configuration.md b/docs/en/02-configuration.md index 54b798df..2be41678 100644 --- a/docs/en/02-configuration.md +++ b/docs/en/02-configuration.md @@ -104,12 +104,22 @@ Resource allocation settings. Compression level settings for various data types. +| Field | Type | Default | Description | +|------------------------------------------------|------|---------|--------------------------------------------------| +| `compression.docs_zstd_compression_level` | int | `1` | Zstandard compression level for documents | +| `compression.metas_zstd_compression_level` | int | `1` | Zstandard compression level for metadata | +| `compression.sealed_zstd_compression_level` | int | `3` | Zstandard compression level for sealed fractions | +| `compression.doc_block_zstd_compression_level` | int | `3` | Zstandard compression level for document blocks | + +## Sealing Configuration + +Settings for fraction sealing. + +### Lids + | Field | Type | Default | Description | |-------|------|---------|-------------| -| `compression.docs_zstd_compression_level` | int | `1` | Zstandard compression level for documents | -| `compression.metas_zstd_compression_level` | int | `1` | Zstandard compression level for metadata | -| `compression.sealed_zstd_compression_level` | int | `3` | Zstandard compression level for sealed fractions | -| `compression.doc_block_zstd_compression_level` | int | `3` | Zstandard compression level for document blocks | +| `sealing.lids.block_size` | int | `65536` | Max lids (postings) saved per LIDs block | ## Indexing Configuration diff --git a/docs/ru/02-configuration.md b/docs/ru/02-configuration.md index 664530aa..6b91ff14 100644 --- a/docs/ru/02-configuration.md +++ b/docs/ru/02-configuration.md @@ -104,12 +104,22 @@ id: configuration Настройки уровня сжатия для различных типов данных. +| Параметр | Тип | Значение по умолчанию | Описание | +|------------------------------------------------|-----|-----------------------|-----------------------------------------| +| `compression.docs_zstd_compression_level` | int | `1` | Уровень сжатия для документов | +| `compression.metas_zstd_compression_level` | int | `1` | Уровень сжатия для метаданных | +| `compression.sealed_zstd_compression_level` | int | `3` | Уровень сжатия для запечатанных фракций | +| `compression.doc_block_zstd_compression_level` | int | `3` | Уровень сжатия для блоков документов | + +## Конфигурация запечатывания + +Настройки запечатывания фракций. + +### Lids + | Параметр | Тип | Значение по умолчанию | Описание | |----------|-----|----------------------|-----------| -| `compression.docs_zstd_compression_level` | int | `1` | Уровень сжатия для документов | -| `compression.metas_zstd_compression_level` | int | `1` | Уровень сжатия для метаданных | -| `compression.sealed_zstd_compression_level` | int | `3` | Уровень сжатия для запечатанных фракций | -| `compression.doc_block_zstd_compression_level` | int | `3` | Уровень сжатия для блоков документов | +| `sealing.lids.block_size` | int | `65536` | Максимальное количество лидов в блоках | ## Конфигурация индексирования diff --git a/frac/common/info.go b/frac/common/info.go index 20e7f7c2..b82f6b99 100644 --- a/frac/common/info.go +++ b/frac/common/info.go @@ -52,7 +52,7 @@ func NewInfo(filename string, docsOnDisk, metaOnDisk uint64) *Info { CreationTime: uint64(time.Now().UnixMilli()), ConstIDsPerBlock: consts.IDsPerBlock, ConstRegularBlockSize: consts.RegularBlockSize, - ConstLIDBlockCap: consts.LIDBlockCap, + ConstLIDBlockCap: consts.DefaultLIDBlockCap, DocsOnDisk: docsOnDisk, MetaOnDisk: metaOnDisk, } diff --git a/frac/common/seal_params.go b/frac/common/seal_params.go index c19365f9..05f89696 100644 --- a/frac/common/seal_params.go +++ b/frac/common/seal_params.go @@ -8,5 +8,6 @@ type SealParams struct { TokenTableZstdLevel int DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. + LIDBlockSize int DocBlockSize int // DocBlockSize is decompressed payload size of document block. } diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index fd970ddb..560760cd 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -341,6 +341,7 @@ func seal(active *Active) (*Sealed, error) { TokenTableZstdLevel: 1, DocBlocksZstdLevel: 1, DocBlockSize: 128 * int(units.KiB), + LIDBlockSize: 512, } activeSealingSource, err := NewActiveSealingSource(active, sealParams) if err != nil { diff --git a/frac/fraction_test.go b/frac/fraction_test.go index b09b5e9e..0d1bfb66 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -98,6 +98,7 @@ func (s *FractionTestSuite) SetupTestCommon() { DocsPositionsZstdLevel: 1, TokenTableZstdLevel: 1, DocBlocksZstdLevel: 1, + LIDBlockSize: 512, DocBlockSize: 128 * int(units.KiB), } diff --git a/frac/processor/search.go b/frac/processor/search.go index d50d930f..30ed4f45 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -53,9 +53,9 @@ var searchBuffersPool = sync.Pool{ return &searchBuffers{ // Currently, we drain up to 4k lids from eval tree, but with proper batching enabled // we can get as much as whole LID block can have (currently, 64k lids) - lids: make([]node.LID, 0, consts.LIDBlockCap), - mids: make([]seq.MID, 0, consts.LIDBlockCap), - rids: make([]seq.RID, 0, consts.LIDBlockCap), + lids: make([]node.LID, 0, consts.DefaultLIDBlockCap), + mids: make([]seq.MID, 0, consts.DefaultLIDBlockCap), + rids: make([]seq.RID, 0, consts.DefaultLIDBlockCap), } }, } diff --git a/frac/sealed/lids/loader.go b/frac/sealed/lids/loader.go index 80a9a854..cf987a97 100644 --- a/frac/sealed/lids/loader.go +++ b/frac/sealed/lids/loader.go @@ -23,12 +23,12 @@ func (b *UnpackBuffer) Reset(fracVer config.BinaryDataVersion) { } if fracVer >= config.BinaryDataV4 { if b.decompressed == nil { - b.decompressed = make([]uint32, 0, consts.LIDBlockCap) + b.decompressed = make([]uint32, 0, consts.DefaultLIDBlockCap) } else { b.decompressed = b.decompressed[:0] } if b.compressed == nil { - b.compressed = make([]uint32, 0, consts.LIDBlockCap) + b.compressed = make([]uint32, 0, consts.DefaultLIDBlockCap) } else { b.compressed = b.compressed[:0] } diff --git a/frac/sealed/sealing/blocks_builder.go b/frac/sealed/sealing/blocks_builder.go index fc069cbf..3c6ce1b0 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/frac/sealed/sealing/blocks_builder.go @@ -5,6 +5,7 @@ import ( "iter" "unsafe" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/frac/sealed/token" @@ -217,6 +218,9 @@ func newLIDAccumulator( blockCapacity int, onBlock func(lidsSealBlock) error, ) *lidAccumulator { + if blockCapacity == 0 { + blockCapacity = consts.DefaultLIDBlockCap + } a := &lidAccumulator{ blockCapacity: blockCapacity, onBlock: onBlock, diff --git a/frac/sealed/sealing/index.go b/frac/sealed/sealing/index.go index 7d3dacfe..e14b36b9 100644 --- a/frac/sealed/sealing/index.go +++ b/frac/sealed/sealing/index.go @@ -45,7 +45,7 @@ func NewIndexSealer(params common.SealParams) *IndexSealer { params: params, buf1: make([]byte, 0, consts.RegularBlockSize), buf2: make([]byte, 0, consts.RegularBlockSize), - buf32: make([]uint32, 0, consts.LIDBlockCap), + buf32: make([]uint32, 0, consts.DefaultLIDBlockCap), buf64: make([]uint64, 0, consts.RegularBlockSize), } } @@ -129,7 +129,7 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err ) lidAccumulator := newLIDAccumulator( - consts.LIDBlockCap, + s.params.LIDBlockSize, func(block lidsSealBlock) error { return lw.writeBlock(blockTypeLID, s.packLIDsBlock(block)) }, diff --git a/fracmanager/config.go b/fracmanager/config.go index 5a9790ac..e295aada 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -57,6 +57,9 @@ func FillConfigWithDefault(config *Config) *Config { if config.SealParams.LIDsZstdLevel == 0 { config.SealParams.LIDsZstdLevel = zstdDefaultLevel } + if config.SealParams.LIDBlockSize == 0 { + config.SealParams.LIDBlockSize = consts.DefaultLIDBlockCap + } if config.SealParams.TokenListZstdLevel == 0 { config.SealParams.TokenListZstdLevel = zstdDefaultLevel } diff --git a/fracmanager/fraction_provider_test.go b/fracmanager/fraction_provider_test.go index aae4e820..f350d3ee 100644 --- a/fracmanager/fraction_provider_test.go +++ b/fracmanager/fraction_provider_test.go @@ -34,6 +34,7 @@ func setupS3Client(t testing.TB) (*s3.Client, func()) { func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func()) { cfg = setupDataDir(t, cfg) + cfg.SealParams.LIDBlockSize = int(64 * units.KB) rl := storage.NewReadLimiter(1, nil) s3cli, stopS3 := setupS3Client(t) idx, stopIdx := frac.NewActiveIndexer(1, 1)