diff --git a/frac/active.go b/frac/active.go index e5484b08c..2907d9c5e 100644 --- a/frac/active.go +++ b/frac/active.go @@ -212,7 +212,6 @@ func (f *Active) replayWalFile(ctx context.Context) error { wg.Wait() if corruptions > 0 { - metric.WALCorruptionsTotal.Add(float64(corruptions)) if err := f.backupCorruptedFiles(); err != nil { logger.Error("failed to copy a corrupted WAL file", zap.String("name", f.info.Name()), @@ -238,7 +237,7 @@ func (f *Active) replayWalFile(ctx context.Context) error { // backupCorruptedFiles saves wal and docs file in a directory with corrupted files for later analysis func (f *Active) backupCorruptedFiles() error { brokenDir := filepath.Join(filepath.Dir(f.BaseFileName), consts.BrokenDir) - if err := os.MkdirAll(brokenDir, 0o777); err != nil { + if err := os.MkdirAll(brokenDir, 0o777); err != nil && !os.IsExist(err) { return fmt.Errorf("create dir %s, err: %w", brokenDir, err) } diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 77a73c787..569c5cec0 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -2,7 +2,9 @@ package fracmanager import ( "context" + "os" "path/filepath" + "strings" "sync" "time" @@ -65,7 +67,7 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk wg := sync.WaitGroup{} ctx, cancel := context.WithCancel(ctx) - startStatsWorker(ctx, registry, &wg) + startStatsWorker(ctx, cfg, registry, &wg) startMaintWorker(ctx, cfg, &fm, &wg) startCacheWorker(ctx, cfg, cache, &wg) @@ -159,7 +161,7 @@ func startCacheWorker(ctx context.Context, cfg *Config, cache *CacheMaintainer, } // startStatsWorker starts periodic statistics collection and reporting -func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitGroup) { +func startStatsWorker(ctx context.Context, cfg *Config, reg *fractionRegistry, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() @@ -170,11 +172,30 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG stats := reg.Stats() stats.Log() // Log statistics stats.SetMetrics() // Update Prometheus metrics + + corruptions := countDocsFiles(filepath.Join(cfg.DataDir, consts.BrokenDir)) + walCorruptionsTotal.Add(float64(corruptions)) }) logger.Info("stats loop is stopped") }() } +func countDocsFiles(dir string) int { + entries, err := os.ReadDir(dir) + if err != nil && !os.IsNotExist(err) { + logger.Error("error reading directory", zap.Error(err)) + return 0 + } + + count := 0 + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), consts.DocsFileSuffix) { + count++ + } + } + return count +} + // startMaintWorker starts periodic fraction maintenance operations func startMaintWorker(ctx context.Context, cfg *Config, fm *FracManager, wg *sync.WaitGroup) { wg.Add(1) diff --git a/fracmanager/metrics.go b/fracmanager/metrics.go index cdd5030de..016ecfca7 100644 --- a/fracmanager/metrics.go +++ b/fracmanager/metrics.go @@ -212,4 +212,11 @@ var ( Name: "bytes_read_total", Help: "Number of bytes read from disk storage", }) + + walCorruptionsTotal = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "storage", + Name: "wal_corruptions_total", + Help: "Number of WAL files with detected corruption during replay", + }) ) diff --git a/metric/store.go b/metric/store.go index 212abf957..54e26b547 100644 --- a/metric/store.go +++ b/metric/store.go @@ -123,12 +123,6 @@ var ( Name: "panics_total", Help: "Number of panics in store", }) - WALCorruptionsTotal = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "seq_db_store", - Subsystem: "storage", - Name: "wal_corruptions_total", - Help: "Number of detected WAL corruption ranges during replay", - }) skippedIndexes = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "seq_db_store",