diff --git a/consts/consts.go b/consts/consts.go index ccaba4e2..ad1c8904 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -84,7 +84,8 @@ const ( // We can remove it in the future releases. IndexDelFileSuffix = ".index.del" - RemoteFractionSuffix = ".remote" + RemoteFractionSuffix = ".remote" + RemoteFractionInfoSuffix = ".remote-info" FracCacheFileSuffix = ".frac-cache" diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 244aeb99..b9c2a190 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -2347,7 +2347,6 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal nil, s.config, testSkipMaskProvider{}, - false, ) s.fraction = sealed @@ -2409,7 +2408,6 @@ func (s *RemoteFractionTestSuite) SetupTest() { s.config, s3cli, testSkipMaskProvider{}, - false, ) s.fraction = remoteFrac diff --git a/frac/remote.go b/frac/remote.go index 9630b8ca..8b7bdf8a 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -3,12 +3,14 @@ package frac import ( "context" "fmt" + "os" "path/filepath" "sync" "go.uber.org/zap" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" @@ -43,8 +45,6 @@ type Remote struct { docsCache *cache.Cache[[]byte] docsReader storage.DocsReader - // IsLegacy is true for fractions that use the old single .index file format. - IsLegacy bool legacyFile storage.ImmutableFile legacyReader storage.IndexReader @@ -79,10 +79,9 @@ func NewRemote( indexCache *IndexCache, docsCache *cache.Cache[[]byte], info *common.Info, - config *Config, + cfg *Config, s3cli *s3.Client, skipMaskProvider skipMaskProvider, - isLegacy bool, ) *Remote { f := &Remote{ ctx: ctx, @@ -95,12 +94,10 @@ func NewRemote( info: info, BaseFileName: baseFile, - Config: config, + Config: cfg, s3cli: s3cli, skipMaskProvider: skipMaskProvider, - - IsLegacy: isLegacy, } // Fast path if fraction-info cache exists AND it has valid index size. @@ -116,7 +113,7 @@ func NewRemote( // https://github.com/ozontech/seq-db/issues/92 if err := f.loadInfo(); err != nil { - logger.Error( + logger.Fatal( "cannot open info file: any subsequent operation will fail", zap.String("fraction", filepath.Base(f.BaseFileName)), zap.Error(err), @@ -175,7 +172,8 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e lidReader := &f.lidReader idReader := &f.idReader - if f.IsLegacy { + isLegacy := f.IsSingleIndex() + if isLegacy { tokenReader = &f.legacyReader lidReader = &f.legacyReader idReader = &f.legacyReader @@ -192,7 +190,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, isLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( @@ -219,6 +217,7 @@ func (f *Remote) Suicide() { // FIXME(dkharms): We need to rename `.remote` file to `._remote` to commit deletion intent. // Now, we might have fraction leaks in S3 storage since [Suicide] is not atomic. util.MustRemoveFileByPath(f.BaseFileName + consts.RemoteFractionSuffix) + util.MustRemoveFileByPath(f.BaseFileName + consts.RemoteFractionInfoSuffix) f.docsCache.Release() f.indexCache.Release() @@ -252,27 +251,60 @@ func (f *Remote) String() string { return fracToString(f, "remote") } +func (f *Remote) IsSingleIndex() bool { + return f.info.BinaryDataVer < config.BinaryDataV3 +} + func (f *Remote) loadInfo() error { - var err error + err := f.tryLoadInfoLocal() + if err == nil { + return nil + } - if f.IsLegacy { - if err := f.openInfoLegacy(); err != nil { - return err - } - if f.info, err = loadInfoLegacy(f.legacyReader); err != nil { - logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err)) - } + logger.Warn( + "cannot open local info file for remote fraction, falling back to S3", + zap.String("fraction", f.BaseFileName), + zap.Error(err), + ) + + err = f.tryLoadInfoRemote() + if err == nil { return nil } - if err := f.openInfo(); err != nil { - return err + logger.Warn( + "cannot open remote info file, falling back to legacy index", + zap.String("fraction", f.BaseFileName), + zap.Error(err), + ) + + return f.loadInfoLegacy() +} + +func (f *Remote) loadInfoLegacy() error { + err := f.openIndexLegacyRemote() + if err == nil { + f.info, err = loadInfoLegacy(f.legacyReader) + } + return err +} + +func (f *Remote) tryLoadInfoLocal() error { + remoteInfoPath := f.BaseFileName + consts.RemoteFractionInfoSuffix + file, err := os.Open(remoteInfoPath) + if err == nil { + defer file.Close() + f.info, err = loadInfo(file) } + return err +} - if f.info, err = loadInfo(f.infoFile); err != nil { - logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err)) +func (f *Remote) tryLoadInfoRemote() error { + err := f.openInfoRemote() + if err == nil { + f.info, err = loadInfo(f.infoFile) } - return nil + return err } func (f *Remote) init() error { @@ -291,7 +323,7 @@ func (f *Remote) init() error { return nil } - if f.IsLegacy { + if f.IsSingleIndex() { (&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader) f.isInited = true return nil @@ -308,44 +340,36 @@ func (f *Remote) init() error { return nil } -func (f *Remote) openInfoLegacy() error { +func (f *Remote) openIndexLegacyRemote() error { if f.legacyFile != nil { return nil } - return f.openRemoteFile(consts.IndexFileSuffix, func(file storage.ImmutableFile) { f.legacyFile = file - f.legacyReader = storage.NewIndexReader( - f.readLimiter, file.Name(), - file, f.indexCache.LegacyRegistry, - ) + f.legacyReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.LegacyRegistry) }) } -func (f *Remote) openInfo() error { +func (f *Remote) openInfoRemote() error { if f.infoFile != nil { return nil } - - return f.openRemoteFile( - consts.InfoFileSuffix, - func(file storage.ImmutableFile) { - f.infoFile = file - }, - ) + return f.openRemoteFile(consts.InfoFileSuffix, func(file storage.ImmutableFile) { + f.infoFile = file + }) } func (f *Remote) openIndex() error { - if f.IsLegacy { - return f.openInfoLegacy() + if f.IsSingleIndex() { + return f.openIndexLegacyRemote() } - if err := f.openInfo(); err != nil { + if err := f.openInfoRemote(); err != nil { return err } if f.tokenFile == nil { - if err := f.openRemoteFile( + err := f.openRemoteFile( consts.TokenFileSuffix, func(file storage.ImmutableFile) { f.tokenFile = file @@ -354,13 +378,14 @@ func (f *Remote) openIndex() error { file, f.indexCache.TokenRegistry, ) }, - ); err != nil { + ) + if err != nil { return err } } if f.offsetsFile == nil { - if err := f.openRemoteFile( + err := f.openRemoteFile( consts.OffsetsFileSuffix, func(file storage.ImmutableFile) { f.offsetsFile = file @@ -369,13 +394,14 @@ func (f *Remote) openIndex() error { file, f.indexCache.OffsetsRegistry, ) }, - ); err != nil { + ) + if err != nil { return err } } if f.idFile == nil { - if err := f.openRemoteFile( + err := f.openRemoteFile( consts.IDFileSuffix, func(file storage.ImmutableFile) { f.idFile = file @@ -384,13 +410,14 @@ func (f *Remote) openIndex() error { file, f.indexCache.IDRegistry, ) }, - ); err != nil { + ) + if err != nil { return err } } if f.lidFile == nil { - if err := f.openRemoteFile( + err := f.openRemoteFile( consts.LIDFileSuffix, func(file storage.ImmutableFile) { f.lidFile = file @@ -399,7 +426,8 @@ func (f *Remote) openIndex() error { file, f.indexCache.LIDRegistry, ) }, - ); err != nil { + ) + if err != nil { return err } } @@ -482,7 +510,7 @@ func (f *Remote) computeIndexSize() { f.lidFile, } - if f.IsLegacy { + if f.IsSingleIndex() { files = []storage.ImmutableFile{ f.legacyFile, } diff --git a/frac/sealed.go b/frac/sealed.go index 4bde6d5b..a55f36a6 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -6,13 +6,13 @@ import ( "fmt" "io" "os" - "path/filepath" "sync" "go.uber.org/zap" "golang.org/x/sync/errgroup" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" @@ -39,8 +39,6 @@ type Sealed struct { docsCache *cache.Cache[[]byte] docsReader storage.DocsReader - // IsLegacy is true for fractions that use the old single .index file format. - IsLegacy bool legacyFile *os.File legacyReader storage.IndexReader @@ -84,9 +82,8 @@ func NewSealed( indexCache *IndexCache, docsCache *cache.Cache[[]byte], info *common.Info, - config *Config, + cfg *Config, skipMaskProvider skipMaskProvider, - isLegacy bool, ) *Sealed { f := &Sealed{ initMu: &sync.RWMutex{}, @@ -95,10 +92,9 @@ func NewSealed( docsCache: docsCache, indexCache: indexCache, - IsLegacy: isLegacy, info: info, BaseFileName: baseFile, - Config: config, + Config: cfg, PartialSuicideMode: Off, @@ -122,7 +118,7 @@ func NewSealedPreloaded( rl *storage.ReadLimiter, indexCache *IndexCache, docsCache *cache.Cache[[]byte], - config *Config, + cfg *Config, skipMaskProvider skipMaskProvider, ) *Sealed { f := &Sealed{ @@ -137,7 +133,7 @@ func NewSealedPreloaded( info: preloaded.Info, BaseFileName: baseFile, - Config: config, + Config: cfg, skipMaskProvider: skipMaskProvider, } @@ -160,7 +156,11 @@ func NewSealedPreloaded( return f } -func (f *Sealed) openInfoLegacy() { +func (f *Sealed) IsSingleIndex() bool { + return f.info.BinaryDataVer < config.BinaryDataV3 +} + +func (f *Sealed) openIndexLegacy() { if f.legacyFile != nil { return } @@ -182,68 +182,63 @@ func (f *Sealed) openInfoLegacy() { ) } -func (f *Sealed) openInfo() { +func (f *Sealed) openInfo() error { if f.infoFile != nil { - return + return nil } - name := f.BaseFileName + consts.InfoFileSuffix - file, err := os.Open(name) + file, err := os.Open(f.BaseFileName + consts.InfoFileSuffix) if err != nil { - logger.Fatal( - "can't open info file", - zap.String("file", name), - zap.Error(err), - ) + return err } f.infoFile = file + return nil } func (f *Sealed) openIndex() { - if f.IsLegacy { + if f.IsSingleIndex() { // We have exactly one `.index` file for legacy sealed fractions. // So opening only this file is sufficient. - f.openInfoLegacy() + f.openIndexLegacy() return } - f.openInfo() + if err := f.openInfo(); err != nil { + logger.Fatal("can't open info file", zap.String("fraction", f.BaseFileName), zap.Error(err)) + } + if f.tokenFile == nil { - name := f.BaseFileName + consts.TokenFileSuffix - file, err := os.Open(name) + file, err := os.Open(f.BaseFileName + consts.TokenFileSuffix) if err != nil { - logger.Fatal("can't open token file", zap.String("file", name), zap.Error(err)) + logger.Fatal("can't open token file", zap.String("fraction", f.BaseFileName), zap.Error(err)) } f.tokenFile = file f.tokenReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.TokenRegistry) } if f.offsetsFile == nil { - name := f.BaseFileName + consts.OffsetsFileSuffix - file, err := os.Open(name) + file, err := os.Open(f.BaseFileName + consts.OffsetsFileSuffix) if err != nil { - logger.Fatal("can't open offsets file", zap.String("file", name), zap.Error(err)) + logger.Fatal("can't open offsets file", zap.String("fraction", f.BaseFileName), zap.Error(err)) } f.offsetsFile = file f.offsetsReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.OffsetsRegistry) } if f.idFile == nil { - name := f.BaseFileName + consts.IDFileSuffix - file, err := os.Open(name) + file, err := os.Open(f.BaseFileName + consts.IDFileSuffix) if err != nil { - logger.Fatal("can't open id file", zap.String("file", name), zap.Error(err)) + logger.Fatal("can't open id file", zap.String("fraction", f.BaseFileName), zap.Error(err)) } f.idFile = file f.idReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.IDRegistry) } if f.lidFile == nil { - name := f.BaseFileName + consts.LIDFileSuffix - file, err := os.Open(name) + file, err := os.Open(f.BaseFileName + consts.LIDFileSuffix) if err != nil { - logger.Fatal("can't open lid file", zap.String("file", name), zap.Error(err)) + logger.Fatal("can't open lid file", zap.String("fraction", f.BaseFileName), zap.Error(err)) } f.lidFile = file f.lidReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.LIDRegistry) @@ -280,20 +275,35 @@ func (f *Sealed) openDocs() { } func (f *Sealed) loadInfo() { - var err error + if err := f.tryLoadInfo(); err != nil { + logger.Warn( + "cannot open single info file, falling back to legacy index", + zap.String("fraction", f.BaseFileName), + zap.Error(err), + ) + f.tryLoadLegacyInfo() + } +} - if f.IsLegacy { - f.openInfoLegacy() - if f.info, err = loadInfoLegacy(f.legacyReader); err != nil { - logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err)) - } - return +func (f *Sealed) tryLoadInfo() error { + if err := f.openInfo(); err != nil { + return err + } + info, err := loadInfo(f.infoFile) + if err != nil { + logger.Fatal("error loading info", zap.String("fraction", f.BaseFileName), zap.Error(err)) } + f.info = info + return nil +} - f.openInfo() - if f.info, err = loadInfo(f.infoFile); err != nil { - logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err)) +func (f *Sealed) tryLoadLegacyInfo() { + f.openIndexLegacy() + info, err := loadInfoLegacy(f.legacyReader) + if err != nil { + logger.Fatal("error loading legacy info", zap.String("fraction", f.BaseFileName), zap.Error(err)) } + f.info = info } func (f *Sealed) init(full bool) { @@ -307,7 +317,7 @@ func (f *Sealed) init(full bool) { return } - if f.IsLegacy { + if f.IsSingleIndex() { (&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader) f.isInited = true return @@ -330,7 +340,7 @@ func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) g, gctx := errgroup.WithContext(ctx) g.Go(func() error { return u.Upload(gctx, f.docsFile) }) - if f.IsLegacy { + if f.IsSingleIndex() { g.Go(func() error { return u.Upload(gctx, f.legacyFile) }) } else { g.Go(func() error { return u.Upload(gctx, f.infoFile) }) @@ -344,15 +354,18 @@ func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) return true, err } - remoteFracName := f.BaseFileName + consts.RemoteFractionSuffix - file, err := os.Create(remoteFracName) + info, err := io.ReadAll(f.infoFile) + if err != nil { + return true, err + } + + remoteInfoName := f.BaseFileName + consts.RemoteFractionInfoSuffix + err = util.WriteFileAtomic(remoteInfoName, info, 0o666, ".tmp") if err != nil { return true, err } - defer file.Close() - util.MustSyncPath(filepath.Dir(remoteFracName)) - return true, nil + return true, err } func (f *Sealed) Release() { @@ -367,7 +380,7 @@ func (f *Sealed) Release() { f.lidFile, } - if f.IsLegacy { + if f.IsSingleIndex() { indexFiles = []*os.File{ f.docsFile, f.legacyFile, @@ -428,7 +441,7 @@ func (f *Sealed) Suicide() { consts.LIDFileSuffix, } - if f.IsLegacy { + if f.IsSingleIndex() { indexSuffixes = []string{ consts.IndexFileSuffix, } @@ -497,7 +510,8 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { lidReader := &f.lidReader idReader := &f.idReader - if f.IsLegacy { + isLegacy := f.IsSingleIndex() + if isLegacy { tokenReader = &f.legacyReader lidReader = &f.legacyReader idReader = &f.legacyReader @@ -514,7 +528,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs), tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), - tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, isLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, idsProvider: seqids.NewProvider( @@ -589,7 +603,7 @@ func (f *Sealed) computeIndexSize() { consts.LIDFileSuffix, } - if f.IsLegacy { + if f.IsSingleIndex() { suffixes = []string{ consts.IndexFileSuffix, } diff --git a/fracmanager/frac_manifest.go b/fracmanager/frac_manifest.go index 241006da..374e2f5f 100644 --- a/fracmanager/frac_manifest.go +++ b/fracmanager/frac_manifest.go @@ -18,13 +18,14 @@ import ( // fracManifest represents a manifest of fraction files // Contains information about the presence of various file types for a specific fraction type fracManifest struct { - basePath string // base path to fraction files (without extension) - hasDocs bool // presence of main documents file - hasMeta bool // presence of meta-information (legacy WAL format) - hasWal bool // presence of WAL with meta (new WAL format) - hasIndex bool // presence of index file - hasSdocs bool // presence of sorted documents - hasRemote bool // presence of remote fraction + basePath string // base path to fraction files (without extension) + hasDocs bool // presence of main documents file + hasMeta bool // presence of meta-information (legacy WAL format) + hasWal bool // presence of WAL with meta (new WAL format) + hasIndex bool // presence of index file + hasSdocs bool // presence of sorted documents + hasRemote bool // presence of remote fraction (legacy) + hasRemoteInfo bool // presence of .remote-info // Split index file flags hasInfo bool @@ -58,6 +59,9 @@ func (m *fracManifest) AddExtension(ext string) error { m.hasSdocs = true case consts.IndexFileSuffix: m.hasIndex = true + case consts.RemoteFractionInfoSuffix: + m.hasRemote = true + m.hasRemoteInfo = true case consts.RemoteFractionSuffix: m.hasRemote = true @@ -349,6 +353,7 @@ func (f *fracManifest) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddBool("hasIndex", f.hasIndex) enc.AddBool("hasSdocs", f.hasSdocs) enc.AddBool("hasRemote", f.hasRemote) + enc.AddBool("hasRemoteInfo", f.hasRemoteInfo) enc.AddBool("hasInfo", f.hasInfo) enc.AddBool("hasToken", f.hasToken) diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 54b10979..1aa048cd 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -71,7 +71,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active { ) } -func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info, isLegacy bool) *frac.Sealed { +func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *frac.Sealed { return frac.NewSealed( name, fp.readLimiter, @@ -80,7 +80,6 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info, isLe cachedInfo, // Preloaded meta information &fp.config.Fraction, fp.skipMaskProvider, - isLegacy, ) } @@ -96,7 +95,7 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *seale ) } -func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedInfo *common.Info, isLegacy bool) *frac.Remote { +func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedInfo *common.Info) *frac.Remote { return frac.NewRemote( ctx, name, @@ -107,7 +106,6 @@ func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedIn &fp.config.Fraction, fp.s3cli, fp.skipMaskProvider, - isLegacy, ) } @@ -169,5 +167,5 @@ func (fp *fractionProvider) Offload(ctx context.Context, f *frac.Sealed) (*frac. } info := f.Info() - return fp.NewRemote(ctx, info.Path, info, f.IsLegacy), nil + return fp.NewRemote(ctx, info.Path, info), nil } diff --git a/fracmanager/loader.go b/fracmanager/loader.go index a3273c0c..6eb788ee 100644 --- a/fracmanager/loader.go +++ b/fracmanager/loader.go @@ -9,7 +9,6 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" - "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/logger" ) @@ -137,23 +136,9 @@ func (l *Loader) discover(ctx context.Context) ([]*frac.Active, []*frac.Sealed, case fracStageActive: actives = append(actives, l.provider.NewActive(manifest.basePath)) case fracStageSealed: - locals = append(locals, l.loadSealed(manifest, loadedInfoCache)) + locals = append(locals, l.loadSealed(manifest.basePath, loadedInfoCache)) case fracStageRemote: - // TODO(dkharms): Drop this check once we store `Info` for remote fractions locally. - - indexName := filepath.Base(manifest.basePath) + consts.IndexFileSuffix - hasIndex, err := l.provider.s3cli.Exists(ctx, indexName) - if err != nil { - logger.Error( - "will skip fraction: cannot check existence of .index file", - zap.String("fraction", filepath.Base(manifest.basePath)), - zap.Error(err), - ) - continue - } - - manifest.hasIndex = hasIndex - remotes = append(remotes, l.loadRemote(ctx, manifest, loadedInfoCache)) + remotes = append(remotes, l.loadRemote(ctx, manifest.basePath, loadedInfoCache)) default: logger.Error("unexpected fraction stage", zap.Any("manifest", manifest)) } @@ -168,21 +153,21 @@ func (l *Loader) discover(ctx context.Context) ([]*frac.Active, []*frac.Sealed, } // loadSealed loads a sealed fraction using cache -func (l *Loader) loadSealed(manifest *fracManifest, loadedInfoCache *fracInfoCache) *frac.Sealed { - info, found := loadedInfoCache.Get(filepath.Base(manifest.basePath)) +func (l *Loader) loadSealed(basePath string, loadedInfoCache *fracInfoCache) *frac.Sealed { + info, found := loadedInfoCache.Get(filepath.Base(basePath)) l.updateStats(found) - f := l.provider.NewSealed(manifest.basePath, info, manifest.hasIndex) + f := l.provider.NewSealed(basePath, info) l.infoCache.Add(f.Info()) return f } // loadRemote loads a remote fraction -func (l *Loader) loadRemote(ctx context.Context, manifest *fracManifest, loadedInfoCache *fracInfoCache) *frac.Remote { - info, found := loadedInfoCache.Get(filepath.Base(manifest.basePath)) +func (l *Loader) loadRemote(ctx context.Context, basePath string, loadedInfoCache *fracInfoCache) *frac.Remote { + info, found := loadedInfoCache.Get(filepath.Base(basePath)) l.updateStats(found) - f := l.provider.NewRemote(ctx, manifest.basePath, info, manifest.hasIndex) + f := l.provider.NewRemote(ctx, basePath, info) l.infoCache.Add(f.Info()) return f } diff --git a/fracmanager/loader_test.go b/fracmanager/loader_test.go index 6ba8abb6..2051cf27 100644 --- a/fracmanager/loader_test.go +++ b/fracmanager/loader_test.go @@ -3,18 +3,22 @@ package fracmanager import ( "context" "math/rand" + "os" "path/filepath" "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" ) func setupLoaderTest(t testing.TB, cfg *Config) (*fractionProvider, *Loader, func()) { @@ -273,3 +277,254 @@ func TestDiscover(t *testing.T) { assert.Empty(t, expectedSealed, "we don't expect any more sealed fractions") assert.Empty(t, expectedRemote, "we don't expect any more remote fractions") } + +// createEmptyRemoteFile creates an empty .remote marker file on disk. +func createEmptyRemoteFile(t testing.TB, basePath string) { + t.Helper() + + err := os.WriteFile(basePath+consts.RemoteFractionSuffix, nil, 0o644) + require.NoError(t, err) +} + +// TestDiscover_RemoteInfoExists verifies that a fraction with .remote-info is detected +// as remote with the new split format (no S3 request needed). +// No .frac-cache +func TestDiscover_RemoteInfoExists(t *testing.T) { + fp, loader, tearDown := setupLoaderTest(t, nil) + defer tearDown() + + // Create a sealed fraction and offload it — this creates .remote-info on disk + // and uploads all files to S3. + a := fp.CreateActive() + appendDocsToActive(t, a, 10) + s, err := fp.Seal(a) + require.NoError(t, err) + + r, err := fp.Offload(t.Context(), s) + require.NoError(t, err) + require.NotNil(t, r) + s.Suicide() + + // Now discover from FS. + actives, locals, remotes, err := loader.discover(t.Context()) + require.NoError(t, err) + + assert.Empty(t, actives, "no active fractions expected") + assert.Empty(t, locals, "no local fractions expected") + require.Len(t, remotes, 1, "one remote fraction expected") + + remote := remotes[0] + assert.Equal(t, r.Info().Name(), remote.Info().Name(), "remote fraction name should match") + assert.False(t, remote.IsSingleIndex(), "remote fraction with .remote-info should be non-legacy") + assert.True(t, util.FileExists(remote.BaseFileName+consts.RemoteFractionInfoSuffix), "file .remote-info must exists") +} + +// TestDiscover_EmptyRemote_NewIndex verifies that a fraction with empty .remote +// and no .index in S3 (but split files exist) is detected as non-legacy remote. +// Uses a real offloaded fraction, then replaces .remote-info with empty .remote. +// No .frac-cache +func TestDiscover_EmptyRemote_NewIndex(t *testing.T) { + fp, loader, tearDown := setupLoaderTest(t, nil) + defer tearDown() + + // Create a sealed fraction and offload it — this creates real files in S3. + a := fp.CreateActive() + appendDocsToActive(t, a, 10) + s, err := fp.Seal(a) + require.NoError(t, err) + + r, err := fp.Offload(t.Context(), s) + require.NoError(t, err) + require.NotNil(t, r) + s.Suicide() + + basePath := r.BaseFileName + + // Remove .remote-info and create empty .remote marker instead. + err = os.Remove(basePath + consts.RemoteFractionInfoSuffix) + require.NoError(t, err) + createEmptyRemoteFile(t, basePath) + + // Discover from FS. + actives, locals, remotes, err := loader.discover(t.Context()) + require.NoError(t, err) + + assert.Empty(t, actives, "no active fractions expected") + assert.Empty(t, locals, "no local fractions expected") + require.Len(t, remotes, 1, "one remote fraction expected") + + remote := remotes[0] + assert.Equal(t, r.Info().Name(), remote.Info().Name(), "remote fraction name should match") + assert.False(t, remote.IsSingleIndex(), "remote fraction without .index in S3 should be non-legacy") +} + +// TestDiscover_EmptyRemote_CacheLegacy verifies that a fraction with empty .remote +// and cached Info with BinaryDataVer < V3 is detected as legacy remote. +func TestDiscover_EmptyRemote_CacheLegacy(t *testing.T) { + fp, loader, tearDown := setupLoaderTest(t, nil) + defer tearDown() + + // Create a sealed fraction and offload it. + a := fp.CreateActive() + appendDocsToActive(t, a, 10) + s, err := fp.Seal(a) + require.NoError(t, err) + + r, err := fp.Offload(t.Context(), s) + require.NoError(t, err) + require.NotNil(t, r) + s.Suicide() + + basePath := r.BaseFileName + baseName := r.Info().Name() + + // Remove .remote-info and create empty .remote marker instead. + err = os.Remove(basePath + consts.RemoteFractionInfoSuffix) + require.NoError(t, err) + createEmptyRemoteFile(t, basePath) + + // Add cached Info with BinaryDataVer < V3 (simulating legacy) + // and IndexOnDisk > 0 so NewRemote fast path (info.IndexOnDisk > 0) works. + cachedInfo := &common.Info{ + Path: basePath, + DocsTotal: r.Info().DocsTotal, + BinaryDataVer: config.BinaryDataV2, // < V3 — legacy + IndexOnDisk: 4096, // > 0 — enables fast path in NewRemote + } + loader.infoCache.Add(cachedInfo) + err = loader.infoCache.SyncWithDisk() + require.NoError(t, err) + + // Discover from FS. + actives, locals, remotes, err := loader.discover(t.Context()) + require.NoError(t, err) + + assert.Empty(t, actives, "no active fractions expected") + assert.Empty(t, locals, "no local fractions expected") + require.Len(t, remotes, 1, "one remote fraction expected") + + remote := remotes[0] + assert.Equal(t, baseName, remote.Info().Name(), "remote fraction name should match") + assert.True(t, remote.IsSingleIndex(), "remote fraction with cached BinaryDataVer= V3 is detected as non-legacy remote. +// Since split format is known from cache, no S3 request should be made. +func TestDiscover_EmptyRemote_CacheNew(t *testing.T) { + fp, loader, tearDown := setupLoaderTest(t, nil) + defer tearDown() + + // Create a sealed fraction and offload it. + a := fp.CreateActive() + appendDocsToActive(t, a, 10) + s, err := fp.Seal(a) + require.NoError(t, err) + + // Add cached Info and sync to disk so loadedInfoCache inside discover() picks it up. + loader.infoCache.Add(s.Info()) + err = loader.infoCache.SyncWithDisk() + require.NoError(t, err) + + // Offload and remove localy + r, err := fp.Offload(t.Context(), s) + require.NoError(t, err) + require.NotNil(t, r) + s.Suicide() + + // Remove .remote-info and create empty .remote marker instead. + basePath := r.BaseFileName + err = os.Remove(basePath + consts.RemoteFractionInfoSuffix) + require.NoError(t, err) + createEmptyRemoteFile(t, basePath) + + // Discover from FS. + actives, locals, remotes, err := loader.discover(t.Context()) + require.NoError(t, err) + + assert.Empty(t, actives, "no active fractions expected") + assert.Empty(t, locals, "no local fractions expected") + require.Len(t, remotes, 1, "one remote fraction expected") + + remote := remotes[0] + assert.Equal(t, r.Info().Name(), remote.Info().Name(), "remote fraction name should match") + assert.False(t, remote.IsSingleIndex(), "remote fraction with cached BinaryDataVer>=V3 should be non-legacy") +} + +// TestLoadRemote_Legacy verifies loading a legacy remote fraction using cached Info +// with IndexOnDisk > 0 (fast path — no S3 request for info loading). +func TestLoadRemote_Legacy(t *testing.T) { + fp, loader, tearDown := setupLoaderTest(t, nil) + defer tearDown() + + baseName := "seq-db-TESTLEGACYLOAD" + basePath := filepath.Join(fp.config.DataDir, baseName) + + // Create cached Info with IndexOnDisk > 0 (legacy, fast path). + cachedInfo := &common.Info{ + Path: basePath, + BinaryDataVer: config.BinaryDataV2, + DocsTotal: 100, + IndexOnDisk: 4096, + } + loadedInfoCache := NewFracInfoCacheFromDisk(loader.infoCache.fullPath) + loadedInfoCache.Add(cachedInfo) + + remote := loader.loadRemote(t.Context(), basePath, loadedInfoCache) + require.NotNil(t, remote) + assert.True(t, remote.IsSingleIndex(), "should be legacy") +} + +// TestLoadRemote_NewFormat verifies loading a new-format remote fraction with .remote-info. +func TestLoadRemote_NewFormat(t *testing.T) { + fp, loader, tearDown := setupLoaderTest(t, nil) + defer tearDown() + + // Create a sealed fraction and offload it. + a := fp.CreateActive() + appendDocsToActive(t, a, 10) + s, err := fp.Seal(a) + require.NoError(t, err) + + r, err := fp.Offload(t.Context(), s) + require.NoError(t, err) + require.NotNil(t, r) + s.Suicide() + + loadedInfoCache := NewFracInfoCacheFromDisk(loader.infoCache.fullPath) + remote := loader.loadRemote(t.Context(), r.BaseFileName, loadedInfoCache) + require.NotNil(t, remote) + assert.False(t, remote.IsSingleIndex(), "should be non-legacy") + assert.Equal(t, r.Info().Name(), remote.Info().Name(), "name should match") +} + +// TestLoadRemote_RemoteInfoFallback verifies loading a remote fraction where +// .remote-info is missing but .info exists in S3. +// Uses a real offloaded fraction to ensure valid .info file in S3. +func TestLoadRemote_RemoteInfoFallback(t *testing.T) { + fp, loader, tearDown := setupLoaderTest(t, nil) + defer tearDown() + + // Create a sealed fraction and offload it — this uploads valid files to S3. + a := fp.CreateActive() + appendDocsToActive(t, a, 10) + s, err := fp.Seal(a) + require.NoError(t, err) + + r, err := fp.Offload(t.Context(), s) + require.NoError(t, err) + require.NotNil(t, r) + s.Suicide() + + basePath := r.BaseFileName + + // Remove .remote-info so loadInfo() falls back to S3. + err = os.Remove(basePath + consts.RemoteFractionInfoSuffix) + require.NoError(t, err) + + loadedInfoCache := NewFracInfoCacheFromDisk(loader.infoCache.fullPath) + remote := loader.loadRemote(t.Context(), basePath, loadedInfoCache) + require.NotNil(t, remote) + assert.False(t, remote.IsSingleIndex(), "should be non-legacy") + assert.Equal(t, r.Info().Name(), remote.Info().Name(), "name should match") +} diff --git a/util/fs.go b/util/fs.go index 861293ca..d3f48df8 100644 --- a/util/fs.go +++ b/util/fs.go @@ -161,3 +161,8 @@ func CopyFile(src, dst string) error { _, err = io.Copy(out, in) return err } + +func FileExists(filename string) bool { + _, err := os.Stat(filename) + return !os.IsNotExist(err) +}