Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c092251
feat(fracmanager): implement fraction snapshots with wait group refer…
eguguchkin Mar 27, 2026
e447d7f
review fixes
eguguchkin May 20, 2026
69c1e69
feat: new api for compaction
dkharms May 21, 2026
dcdeec2
chore: remove garbage code
dkharms May 22, 2026
96ba81f
chore: remove evict local fix
dkharms May 22, 2026
0978ee4
refactor: introduce `blockbuilder` and `indexwriter` package
dkharms Apr 27, 2026
3abf863
refactor: filename similar to package name
dkharms Apr 27, 2026
6bf88e3
refactor: remove `BlockBuilder` type
dkharms Apr 27, 2026
b9f77b8
refactor: move unexported functions
dkharms Apr 27, 2026
bb339f6
chore: fix rebase conflicts
dkharms May 7, 2026
7484ac0
refactor: merge `indexwriter` and `blockbuilder`
dkharms May 7, 2026
e0dfb13
chore: fix rebase conflicts
dkharms May 25, 2026
8bc6dff
chore: do not export some methods
dkharms May 25, 2026
c2c3fce
feat: k-way fraction merge
dkharms Apr 9, 2026
eac4423
feat: calculate information correctly
dkharms Apr 10, 2026
465bc20
feat: use linear scan for k-way merge
dkharms Apr 13, 2026
9944b81
fix: calculate offsets and info once
dkharms Apr 27, 2026
21a854b
feat: build distribution for compacted fraction
dkharms Apr 27, 2026
d3e1c9e
refactor: consistent naming
dkharms Apr 28, 2026
1cb6543
feat: implement `stcs`
dkharms May 25, 2026
3118a7d
feat: add `FractionName` method for `FracManager`
dkharms May 27, 2026
e0628dd
feat: first iteration on `planner`
dkharms May 27, 2026
688cb8c
feat: first iteration on `Executor`
dkharms May 27, 2026
14001ce
feat: add compaction executor startup
dkharms May 27, 2026
b7ae92a
refactor: use local `fraction` interface
dkharms May 27, 2026
ec7be15
refactor: move to `frac_test` package
dkharms May 27, 2026
65b228e
feat: rotate fraction based on in-memory size
dkharms May 7, 2026
d33fd80
chore: frac-size=1MiB
dkharms May 27, 2026
fdde91d
chore: group by creation time
dkharms May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .seqbench/comparison.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GOGC=100

SEQDB_STORAGE_FRAC_SIZE=16MiB
SEQDB_STORAGE_FRAC_SIZE=1MiB
SEQDB_STORAGE_TOTAL_SIZE=10GiB

SEQDB_LIMITS_QUERY_RATE=1024
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ test-deps:

.PHONY: test
test: test-deps
LOG_LEVEL=ERROR go test ./... -count 1
LOG_LEVEL=ERROR go test ./... -count 1 -v

.bin-deps: export GOBIN := $(LOCAL_BIN)
.bin-deps:
Expand Down
27 changes: 15 additions & 12 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ type AsyncSearcherConfig struct {
}

type fractionAcquirer interface {
Fractions() fracmanager.List
AcquireFractions() (_ fracmanager.List, release func())
AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool)
}

func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fractionAcquirer) *AsyncSearcher {
func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracProvider fractionAcquirer) *AsyncSearcher {
if config.DataDir == "" {
logger.Fatal("can't start async searcher: DataDir is empty")
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fracti
for _, id := range notProcessedIDs {
asyncSearchActiveSearches.Add(1)
as.processWg.Add(1)
go as.processRequest(id, fracs)
go as.processRequest(id, fracProvider)
}

// set limit metrics that allow us to calculate alerts' thresholds
Expand Down Expand Up @@ -209,7 +209,7 @@ func (i *asyncSearchInfo) Status() AsyncSearchStatus {
return status
}

func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquirer) error {
func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracProvider fractionAcquirer) error {
if as.readOnly.Load() {
return fmt.Errorf("cannot start search on read-only mode")
}
Expand Down Expand Up @@ -240,14 +240,17 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquire
return fmt.Errorf("retention time should be less than %s, got %s", maxRetention, r.Retention)
}

fracNames := fracs.Fractions().FilterInRange(r.Params.From, r.Params.To).Names()
fracs, release := fracProvider.AcquireFractions()
defer release()

fracNames := fracs.FilterInRange(r.Params.From, r.Params.To).Names()
if ok := as.saveSearchInfo(r, fracNames); !ok {
// Request was saved previously, skip it
return nil
}
asyncSearchActiveSearches.Add(1)
as.processWg.Add(1)
go as.processRequest(r.ID, fracs)
go as.processRequest(r.ID, fracProvider)
return nil
}

Expand Down Expand Up @@ -301,17 +304,17 @@ func (as *AsyncSearcher) createDataDir() {
})
}

func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fractionAcquirer) {
func (as *AsyncSearcher) processRequest(asyncSearchID string, fracProvider fractionAcquirer) {
defer as.processWg.Done()

as.rateLimit <- struct{}{}
defer func() { <-as.rateLimit }()

as.doSearch(asyncSearchID, fracs)
as.doSearch(asyncSearchID, fracProvider)
asyncSearchActiveSearches.Add(-1)
}

func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) {
func (as *AsyncSearcher) doSearch(id string, fracProvider fractionAcquirer) {
qprPaths, err := as.findQPRs(id)
if err != nil {
panic(fmt.Errorf("can't find QPRs for id %q: %s", id, err))
Expand Down Expand Up @@ -353,7 +356,7 @@ func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) {
if as.shouldStopSearch(id) {
break
}
if err := as.acquireAndProcessFrac(fracInfo, info, fracs); err != nil {
if err := as.acquireAndProcessFrac(fracInfo, info, fracProvider); err != nil {
as.updateSearchInfo(id, func(info *asyncSearchInfo) {
info.Error = err.Error()
})
Expand Down Expand Up @@ -395,8 +398,8 @@ func compressQPR(qpr *seq.QPR, cb func(compressed []byte) error) error {
return nil
}

func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracs fractionAcquirer) (err error) {
f, release, ok := fracs.AcquireFraction(fracInfo.Name)
func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracProvider fractionAcquirer) (err error) {
f, release, ok := fracProvider.AcquireFraction(fracInfo.Name)
if !ok { // oldest fracs may already be removed
logger.Info(
"async search: skip missing fraction",
Expand Down
4 changes: 2 additions & 2 deletions asyncsearcher/async_searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (fp fakeFractionProvider) AcquireFraction(name string) (frac.Fraction, func
return nil, func() {}, false
}

func (fp fakeFractionProvider) Fractions() fracmanager.List {
return fracmanager.List(fp)
func (fp fakeFractionProvider) AcquireFractions() (fracmanager.List, func()) {
return fracmanager.List(fp), func() {}
}

func TestAsyncSearcherMaintain(t *testing.T) {
Expand Down
65 changes: 65 additions & 0 deletions compaction/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package compaction

import (
"sync"

"go.uber.org/zap"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/logger"
)

type Executor struct {
workers int
wg sync.WaitGroup
p *planner
}

// FIXME(dkharms): I need to pass here [common.SealParams].
func NewExecutor(workers int, p *planner) *Executor {
e := Executor{workers: workers, p: p}
e.init()
return &e
}

func (e *Executor) Close() {
e.p.close()
e.wg.Wait()
}

func (e *Executor) init() {
for range e.workers {
e.wg.Go(func() {
for t := range e.p.tasks {
logger.Info(
"got new compaction task",
zap.Time("bin", t.bin),
zap.Any("snapshot", t.snapshot),
)
t.onComplete(e.compact(t))
}
})
}
}

func (e *Executor) compact(t task) (*sealed.PreloadedData, error) {
var (
names []string
srcs []Source
)

for _, f := range t.snapshot.Fractions() {
names = append(names, f.Info().Name())
srcs = append(srcs, frac.NewSealedSource(f))
}

logger.Info(
"compacting fractions",
zap.Strings("names", names),
)

preloaded, err := Merge(t.filename, common.SealParams{}, srcs...)
return preloaded, err
}
162 changes: 162 additions & 0 deletions compaction/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package compaction

import (
"errors"
"os"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/indexwriter"
)

func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.PreloadedData, error) {
writer := indexwriter.New(params)
src := NewMergeSource(filename, srcs)

if err := createAndWrite(
filename+consts.OffsetsTmpFileSuffix,
filename+consts.OffsetsFileSuffix,
func(f *os.File) error { return writer.WriteOffsetsFile(f, src) },
); err != nil {
return nil, err
}

if err := createAndWrite(
filename+consts.IDTmpFileSuffix,
filename+consts.IDFileSuffix,
func(f *os.File) error { return writer.WriteIDFile(f, src) },
); err != nil {
return nil, err
}

if err := createAndWriteBoth(
filename+consts.TokenTmpFileSuffix,
filename+consts.TokenFileSuffix,
filename+consts.LIDTmpFileSuffix,
filename+consts.LIDFileSuffix,
func(tf, lf *os.File) error { return writer.WriteTokenTriplet(tf, lf, src) },
); err != nil {
return nil, err
}

if err := createAndWrite(
filename+consts.InfoTmpFileSuffix,
filename+consts.InfoFileSuffix,
func(f *os.File) error { return writer.WriteInfoFile(f, src) },
); err != nil {
return nil, err
}

if err := mergeDocs(filename, srcs...); err != nil {
return nil, err
}

info := src.Info()
info.IndexOnDisk = 0

for _, suffix := range []string{
consts.InfoFileSuffix,
consts.TokenFileSuffix,
consts.OffsetsFileSuffix,
consts.IDFileSuffix,
consts.LIDFileSuffix,
} {
st, err := os.Stat(info.Path + suffix)
if err != nil {
return nil, err
}
info.IndexOnDisk += uint64(st.Size())
}

lidsTable := writer.LIDsTable()
preloaded := &sealed.PreloadedData{
Info: info,
TokenTable: writer.TokenTable(),
BlocksData: sealed.BlocksData{
LIDsTable: &lidsTable,
IDsTable: writer.IDsTable(),
BlocksOffsets: src.BlockOffsets(),
},
}

return preloaded, nil
}

func mergeDocs(filename string, srcs ...Source) error {
return createAndWrite(
filename+consts.DocsTmpFileSuffix,
filename+consts.DocsFileSuffix,
func(f *os.File) error {
var docsSize uint64
for _, src := range srcs {
for loc, err := range src.DocBlock() {
if err != nil {
return err
}

payload, offset := loc.First, loc.Second
if _, err := f.WriteAt(payload, int64(offset+docsSize)); err != nil {
return err
}
}

docsSize += src.Info().DocsOnDisk
}

return nil
},
)
}

func syncAndClose(f *os.File) error {
if err := f.Sync(); err != nil {
f.Close()
return err
}
return f.Close()
}

func createAndWrite(
tmp, final string,
write func(*os.File) error,
) error {
f, err := os.Create(tmp)
if err != nil {
return err
}

if err := errors.Join(write(f), syncAndClose(f)); err != nil {
return err
}

return os.Rename(tmp, final)
}

func createAndWriteBoth(
atmp, afinal,
btmp, bfinal string,
write func(*os.File, *os.File) error,
) error {
a, err := os.Create(atmp)
if err != nil {
return err
}

b, err := os.Create(btmp)
if err != nil {
a.Close()
return err
}

writeErr := write(a, b)
if err := errors.Join(writeErr, syncAndClose(a), syncAndClose(b)); err != nil {
return err
}

if err := os.Rename(atmp, afinal); err != nil {
return err
}

return os.Rename(btmp, bfinal)
}
Loading