From 9a30a00b124bfd24462a823898cec0d8d5fe471e Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Wed, 3 Jun 2026 20:58:00 +0300 Subject: [PATCH 1/3] refactor: move WAL corruption metric to periodic stats worker counting broken .docs files --- frac/active.go | 3 +-- fracmanager/fracmanager.go | 25 +++++++++++++++++++++++-- fracmanager/metrics.go | 7 +++++++ metric/store.go | 6 ------ 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/frac/active.go b/frac/active.go index e5484b08c..2907d9c5e 100644 --- a/frac/active.go +++ b/frac/active.go @@ -212,7 +212,6 @@ func (f *Active) replayWalFile(ctx context.Context) error { wg.Wait() if corruptions > 0 { - metric.WALCorruptionsTotal.Add(float64(corruptions)) if err := f.backupCorruptedFiles(); err != nil { logger.Error("failed to copy a corrupted WAL file", zap.String("name", f.info.Name()), @@ -238,7 +237,7 @@ func (f *Active) replayWalFile(ctx context.Context) error { // backupCorruptedFiles saves wal and docs file in a directory with corrupted files for later analysis func (f *Active) backupCorruptedFiles() error { brokenDir := filepath.Join(filepath.Dir(f.BaseFileName), consts.BrokenDir) - if err := os.MkdirAll(brokenDir, 0o777); err != nil { + if err := os.MkdirAll(brokenDir, 0o777); err != nil && !os.IsExist(err) { return fmt.Errorf("create dir %s, err: %w", brokenDir, err) } diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 77a73c787..569c5cec0 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -2,7 +2,9 @@ package fracmanager import ( "context" + "os" "path/filepath" + "strings" "sync" "time" @@ -65,7 +67,7 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk wg := sync.WaitGroup{} ctx, cancel := context.WithCancel(ctx) - startStatsWorker(ctx, registry, &wg) + startStatsWorker(ctx, cfg, registry, &wg) startMaintWorker(ctx, cfg, &fm, &wg) startCacheWorker(ctx, cfg, cache, &wg) @@ -159,7 +161,7 @@ func startCacheWorker(ctx context.Context, cfg *Config, cache *CacheMaintainer, } // startStatsWorker starts periodic statistics collection and reporting -func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitGroup) { +func startStatsWorker(ctx context.Context, cfg *Config, reg *fractionRegistry, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() @@ -170,11 +172,30 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG stats := reg.Stats() stats.Log() // Log statistics stats.SetMetrics() // Update Prometheus metrics + + corruptions := countDocsFiles(filepath.Join(cfg.DataDir, consts.BrokenDir)) + walCorruptionsTotal.Add(float64(corruptions)) }) logger.Info("stats loop is stopped") }() } +func countDocsFiles(dir string) int { + entries, err := os.ReadDir(dir) + if err != nil && !os.IsNotExist(err) { + logger.Error("error reading directory", zap.Error(err)) + return 0 + } + + count := 0 + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), consts.DocsFileSuffix) { + count++ + } + } + return count +} + // startMaintWorker starts periodic fraction maintenance operations func startMaintWorker(ctx context.Context, cfg *Config, fm *FracManager, wg *sync.WaitGroup) { wg.Add(1) diff --git a/fracmanager/metrics.go b/fracmanager/metrics.go index cdd5030de..534a3fa36 100644 --- a/fracmanager/metrics.go +++ b/fracmanager/metrics.go @@ -212,4 +212,11 @@ var ( Name: "bytes_read_total", Help: "Number of bytes read from disk storage", }) + + walCorruptionsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "seq_db_store", + Subsystem: "storage", + Name: "wal_corruptions_total", + Help: "Number of WAL files with detected corruption during replay", + }) ) diff --git a/metric/store.go b/metric/store.go index 212abf957..54e26b547 100644 --- a/metric/store.go +++ b/metric/store.go @@ -123,12 +123,6 @@ var ( Name: "panics_total", Help: "Number of panics in store", }) - WALCorruptionsTotal = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "seq_db_store", - Subsystem: "storage", - Name: "wal_corruptions_total", - Help: "Number of detected WAL corruption ranges during replay", - }) skippedIndexes = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "seq_db_store", From 17823a9bb5697ad28cd43fd67012fc36f35df0ad Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Tue, 9 Jun 2026 19:25:39 +0300 Subject: [PATCH 2/3] review fixes --- .gitignore | 2 +- fracmanager/metrics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 222f00958..b1afb5356 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,4 @@ junit.xml /go.work* /data /quickstart/data -/.devcontainer \ No newline at end of file +/.devcontainer diff --git a/fracmanager/metrics.go b/fracmanager/metrics.go index 534a3fa36..016ecfca7 100644 --- a/fracmanager/metrics.go +++ b/fracmanager/metrics.go @@ -213,7 +213,7 @@ var ( Help: "Number of bytes read from disk storage", }) - walCorruptionsTotal = promauto.NewCounter(prometheus.CounterOpts{ + walCorruptionsTotal = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "seq_db_store", Subsystem: "storage", Name: "wal_corruptions_total", From 726a14d19738f54304a5072571704b7669fa8087 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Tue, 9 Jun 2026 19:28:22 +0300 Subject: [PATCH 3/3] review fixes 2 --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b1afb5356..222f00958 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,4 @@ junit.xml /go.work* /data /quickstart/data -/.devcontainer +/.devcontainer \ No newline at end of file