Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func startStore(
SealParams: common.SealParams{
IDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
LIDsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
LIDBlockSize: cfg.Sealing.Lids.BlockSize,
TokenListZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
DocsPositionsZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ type Config struct {
SealingQueueLen int `config:"sealing_queue_len" default:"10"`
} `config:"storage"`

Sealing struct {
Lids struct {
// BlockSize sets max lids (postings) saved per LIDs block.
BlockSize int `config:"block_size" default:"65536"`
} `config:"lids"`
} `config:"sealing"`

Cluster struct {
// WriteStores contains cold store instances which will be written to.
WriteStores []string `config:"write_stores"`
Expand Down
14 changes: 14 additions & 0 deletions config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (c *Config) storeValidations() []validateFn {

inRange("compression.sealed_zstd_compression_level", -7, 22, c.Compression.SealedZstdCompressionLevel),
inRange("compression.doc_block_zstd_compression_level", -7, 22, c.Compression.DocBlockZstdCompressionLevel),
greaterThan("sealing.lids.block_size", 0, c.Sealing.Lids.BlockSize),
lessOrEqThan("sealing.lids.block_size", 65536, c.Sealing.Lids.BlockSize),
inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent),

greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck),
Expand Down Expand Up @@ -106,6 +108,18 @@ func greaterThan[T cmp.Ordered](field string, base, v T) validateFn {
}
}

func lessOrEqThan[T cmp.Ordered](field string, base, v T) validateFn {
return func() error {
if v > base {
return fmt.Errorf(
"field %q must be greater than %v",
field, base,
)
}
return nil
}
}

func inRange[T cmp.Ordered](field string, from, to, v T) validateFn {
return func() error {
if v < from || to < v {
Expand Down
12 changes: 12 additions & 0 deletions config/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ limits:
env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"},
expectErr: false,
},
{
name: "Invalid sealing.lids.block_size",
cfg: baseCfg,
env: map[string]string{"SEQDB_SEALING_LIDS_BLOCK_SIZE": "-1"},
expectErr: true,
},
{
name: "Valid sealing.lids.block_size",
cfg: baseCfg,
env: map[string]string{"SEQDB_SEALING_LIDS_BLOCK_SIZE": "8192"},
expectErr: false,
},
}

for _, tt := range tests {
Expand Down
6 changes: 3 additions & 3 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ const (
// DummyMID is used in aggregations when we do not need to build time series.
DummyMID = 0

IDsPerBlock = int(4 * units.KiB)
LIDBlockCap = int(64 * units.KiB)
RegularBlockSize = int(16 * units.KiB)
IDsPerBlock = int(4 * units.KiB)
DefaultLIDBlockCap = int(64 * units.KiB)
RegularBlockSize = int(16 * units.KiB)

DefaultMaintenanceDelay = time.Second
DefaultCacheGCDelay = 1 * time.Second
Expand Down
18 changes: 14 additions & 4 deletions docs/en/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,22 @@ Resource allocation settings.

Compression level settings for various data types.

| Field | Type | Default | Description |
|------------------------------------------------|------|---------|--------------------------------------------------|
| `compression.docs_zstd_compression_level` | int | `1` | Zstandard compression level for documents |
| `compression.metas_zstd_compression_level` | int | `1` | Zstandard compression level for metadata |
| `compression.sealed_zstd_compression_level` | int | `3` | Zstandard compression level for sealed fractions |
| `compression.doc_block_zstd_compression_level` | int | `3` | Zstandard compression level for document blocks |

## Sealing Configuration

Settings for fraction sealing.

### Lids

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `compression.docs_zstd_compression_level` | int | `1` | Zstandard compression level for documents |
| `compression.metas_zstd_compression_level` | int | `1` | Zstandard compression level for metadata |
| `compression.sealed_zstd_compression_level` | int | `3` | Zstandard compression level for sealed fractions |
| `compression.doc_block_zstd_compression_level` | int | `3` | Zstandard compression level for document blocks |
| `sealing.lids.block_size` | int | `65536` | Max lids (postings) saved per LIDs block |

## Indexing Configuration

Expand Down
18 changes: 14 additions & 4 deletions docs/ru/02-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,22 @@ id: configuration

Настройки уровня сжатия для различных типов данных.

| Параметр | Тип | Значение по умолчанию | Описание |
|------------------------------------------------|-----|-----------------------|-----------------------------------------|
| `compression.docs_zstd_compression_level` | int | `1` | Уровень сжатия для документов |
| `compression.metas_zstd_compression_level` | int | `1` | Уровень сжатия для метаданных |
| `compression.sealed_zstd_compression_level` | int | `3` | Уровень сжатия для запечатанных фракций |
| `compression.doc_block_zstd_compression_level` | int | `3` | Уровень сжатия для блоков документов |

## Конфигурация запечатывания

Настройки запечатывания фракций.

### Lids

| Параметр | Тип | Значение по умолчанию | Описание |
|----------|-----|----------------------|-----------|
| `compression.docs_zstd_compression_level` | int | `1` | Уровень сжатия для документов |
| `compression.metas_zstd_compression_level` | int | `1` | Уровень сжатия для метаданных |
| `compression.sealed_zstd_compression_level` | int | `3` | Уровень сжатия для запечатанных фракций |
| `compression.doc_block_zstd_compression_level` | int | `3` | Уровень сжатия для блоков документов |
| `sealing.lids.block_size` | int | `65536` | Максимальное количество лидов в блоках |

## Конфигурация индексирования

Expand Down
2 changes: 1 addition & 1 deletion frac/common/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewInfo(filename string, docsOnDisk, metaOnDisk uint64) *Info {
CreationTime: uint64(time.Now().UnixMilli()),
ConstIDsPerBlock: consts.IDsPerBlock,
ConstRegularBlockSize: consts.RegularBlockSize,
ConstLIDBlockCap: consts.LIDBlockCap,
ConstLIDBlockCap: consts.DefaultLIDBlockCap,
DocsOnDisk: docsOnDisk,
MetaOnDisk: metaOnDisk,
}
Expand Down
1 change: 1 addition & 0 deletions frac/common/seal_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type SealParams struct {
TokenTableZstdLevel int

DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block.
LIDBlockSize int
DocBlockSize int // DocBlockSize is decompressed payload size of document block.
}
1 change: 1 addition & 0 deletions frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func seal(active *Active) (*Sealed, error) {
TokenTableZstdLevel: 1,
DocBlocksZstdLevel: 1,
DocBlockSize: 128 * int(units.KiB),
LIDBlockSize: 512,
}
activeSealingSource, err := NewActiveSealingSource(active, sealParams)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (s *FractionTestSuite) SetupTestCommon() {
DocsPositionsZstdLevel: 1,
TokenTableZstdLevel: 1,
DocBlocksZstdLevel: 1,
LIDBlockSize: 512,
DocBlockSize: 128 * int(units.KiB),
}

Expand Down
6 changes: 3 additions & 3 deletions frac/processor/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ var searchBuffersPool = sync.Pool{
return &searchBuffers{
// Currently, we drain up to 4k lids from eval tree, but with proper batching enabled
// we can get as much as whole LID block can have (currently, 64k lids)
lids: make([]node.LID, 0, consts.LIDBlockCap),
mids: make([]seq.MID, 0, consts.LIDBlockCap),
rids: make([]seq.RID, 0, consts.LIDBlockCap),
lids: make([]node.LID, 0, consts.DefaultLIDBlockCap),
mids: make([]seq.MID, 0, consts.DefaultLIDBlockCap),
rids: make([]seq.RID, 0, consts.DefaultLIDBlockCap),
}
},
}
Expand Down
4 changes: 2 additions & 2 deletions frac/sealed/lids/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ func (b *UnpackBuffer) Reset(fracVer config.BinaryDataVersion) {
}
if fracVer >= config.BinaryDataV4 {
if b.decompressed == nil {
b.decompressed = make([]uint32, 0, consts.LIDBlockCap)
b.decompressed = make([]uint32, 0, consts.DefaultLIDBlockCap)
} else {
b.decompressed = b.decompressed[:0]
}
if b.compressed == nil {
b.compressed = make([]uint32, 0, consts.LIDBlockCap)
b.compressed = make([]uint32, 0, consts.DefaultLIDBlockCap)
} else {
b.compressed = b.compressed[:0]
}
Expand Down
4 changes: 4 additions & 0 deletions frac/sealed/sealing/blocks_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"iter"
"unsafe"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/seqids"
"github.com/ozontech/seq-db/frac/sealed/token"
Expand Down Expand Up @@ -217,6 +218,9 @@ func newLIDAccumulator(
blockCapacity int,
onBlock func(lidsSealBlock) error,
) *lidAccumulator {
if blockCapacity == 0 {
blockCapacity = consts.DefaultLIDBlockCap
}
a := &lidAccumulator{
blockCapacity: blockCapacity,
onBlock: onBlock,
Expand Down
4 changes: 2 additions & 2 deletions frac/sealed/sealing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewIndexSealer(params common.SealParams) *IndexSealer {
params: params,
buf1: make([]byte, 0, consts.RegularBlockSize),
buf2: make([]byte, 0, consts.RegularBlockSize),
buf32: make([]uint32, 0, consts.LIDBlockCap),
buf32: make([]uint32, 0, consts.DefaultLIDBlockCap),
buf64: make([]uint64, 0, consts.RegularBlockSize),
}
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err
)

lidAccumulator := newLIDAccumulator(
consts.LIDBlockCap,
s.params.LIDBlockSize,
func(block lidsSealBlock) error {
return lw.writeBlock(blockTypeLID, s.packLIDsBlock(block))
},
Expand Down
3 changes: 3 additions & 0 deletions fracmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func FillConfigWithDefault(config *Config) *Config {
if config.SealParams.LIDsZstdLevel == 0 {
config.SealParams.LIDsZstdLevel = zstdDefaultLevel
}
if config.SealParams.LIDBlockSize == 0 {
config.SealParams.LIDBlockSize = consts.DefaultLIDBlockCap
}
if config.SealParams.TokenListZstdLevel == 0 {
config.SealParams.TokenListZstdLevel = zstdDefaultLevel
}
Expand Down
1 change: 1 addition & 0 deletions fracmanager/fraction_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func setupS3Client(t testing.TB) (*s3.Client, func()) {

func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func()) {
cfg = setupDataDir(t, cfg)
cfg.SealParams.LIDBlockSize = int(64 * units.KB)
rl := storage.NewReadLimiter(1, nil)
s3cli, stopS3 := setupS3Client(t)
idx, stopIdx := frac.NewActiveIndexer(1, 1)
Expand Down
Loading