Skip to content

Commit bab8a1d

Browse files
committed
Bug fix.
1 parent 60880b5 commit bab8a1d

File tree

4 files changed

+149
-28
lines changed

4 files changed

+149
-28
lines changed

db/badger.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ func (b *BadgerDB) startMaintenance() {
170170
for {
171171
select {
172172
case <-ticker.C:
173-
// 值日志GC - 智能阈值
174-
err := b.db.RunValueLogGC(0.7)
175-
if err != nil && err != badger.ErrNoRewrite {
173+
// 值日志GC - 单次周期内尝试多轮回收,提升清理效果
174+
_, _, err := b.RunValueLogGCWithStats(0.7, 3)
175+
if err != nil {
176176
log.Printf("Value log GC failed: %v", err)
177177
}
178178

@@ -188,6 +188,28 @@ func (b *BadgerDB) startMaintenance() {
188188
}()
189189
}
190190

191+
// RunValueLogGCWithStats runs bounded ValueLog GC rounds and returns execution stats.
192+
func (b *BadgerDB) RunValueLogGCWithStats(discardRatio float64, maxRounds int) (int, bool, error) {
193+
if maxRounds <= 0 {
194+
maxRounds = 1
195+
}
196+
197+
rounds := 0
198+
for i := 0; i < maxRounds; i++ {
199+
err := b.db.RunValueLogGC(discardRatio)
200+
if err == nil {
201+
rounds++
202+
continue
203+
}
204+
if err == badger.ErrNoRewrite {
205+
return rounds, true, nil
206+
}
207+
return rounds, false, err
208+
}
209+
210+
return rounds, false, nil
211+
}
212+
191213
// SaveModel saves a model to the database
192214
func (b *BadgerDB) SaveModel(modelType string, id uint64, model interface{}) error {
193215
key := fmt.Sprintf("%s:%d", modelType, id)

db/model_ops.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"sort"
88
"strconv"
9+
"strings"
910
"sync"
1011
"time"
1112

@@ -371,24 +372,48 @@ func (o *MonitorHistoryOps) GetMonitorHistoriesByServerAndMonitorRangeReverseLim
371372

372373
// CleanupOldMonitorHistories removes monitor histories older than maxAge
373374
func (o *MonitorHistoryOps) CleanupOldMonitorHistories(maxAge time.Duration) (int, error) {
374-
// Get all monitor IDs
375-
serverOps := NewServerOps(o.db)
376-
servers, err := serverOps.GetAllServers()
375+
cutoffNano := time.Now().Add(-maxAge).UnixNano()
376+
keys, err := o.db.GetKeysWithPrefix("monitor_history:")
377377
if err != nil {
378378
return 0, err
379379
}
380380

381-
totalDeleted := 0
382-
for _, server := range servers {
383-
prefix := fmt.Sprintf("monitor_history:%d", server.ID)
384-
deleted, err := o.db.CleanupExpiredData(prefix, maxAge)
385-
if err != nil {
386-
return totalDeleted, err
381+
keysToDelete := make([]string, 0, len(keys)/2)
382+
for _, key := range keys {
383+
idx := strings.LastIndexByte(key, ':')
384+
if idx <= 0 || idx+1 >= len(key) {
385+
continue
386+
}
387+
388+
ts, parseErr := strconv.ParseInt(key[idx+1:], 10, 64)
389+
if parseErr != nil {
390+
continue
391+
}
392+
393+
if ts <= cutoffNano {
394+
keysToDelete = append(keysToDelete, key)
387395
}
388-
totalDeleted += deleted
389396
}
390397

391-
return totalDeleted, nil
398+
if len(keysToDelete) == 0 {
399+
return 0, nil
400+
}
401+
402+
// Batch delete in one write transaction to reduce lock and txn overhead.
403+
o.db.rwMutex.Lock()
404+
defer o.db.rwMutex.Unlock()
405+
if err := o.db.db.Update(func(txn *badger.Txn) error {
406+
for _, key := range keysToDelete {
407+
if delErr := txn.Delete([]byte(key)); delErr != nil {
408+
return delErr
409+
}
410+
}
411+
return nil
412+
}); err != nil {
413+
return 0, err
414+
}
415+
416+
return len(keysToDelete), nil
392417
}
393418

394419
// UserOps provides specialized operations for users

service/singleton/crontask.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ func InitCronTask() {
7272
}
7373
} else {
7474
log.Println("BadgerDB模式:跳过注册GORM相关的定时任务")
75+
76+
// 每天凌晨 4:15 执行一次深度 value log GC,作为低峰期空间回收补充
77+
if _, err := Cron.AddFunc("0 15 4 * * *", func() {
78+
runBadgerValueLogGC("nightly-deep", 0.55, 20)
79+
}); err != nil {
80+
panic(err)
81+
}
7582
}
7683
}
7784

service/singleton/singleton.go

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,7 @@ func CleanMonitorHistory() (int64, error) {
721721
// 只在清理了记录时才输出日志
722722
if count > 0 {
723723
log.Printf("BadgerDB监控历史清理完成,清理了%d条记录", count)
724+
runBadgerValueLogGC("post-cleanup", 0.65, 10)
724725
}
725726
return int64(count), nil
726727
}
@@ -951,6 +952,28 @@ func CleanMonitorHistory() (int64, error) {
951952
return totalCleaned, nil
952953
}
953954

955+
func runBadgerValueLogGC(trigger string, discardRatio float64, maxRounds int) {
956+
if Conf == nil || Conf.DatabaseType != "badger" || db.DB == nil {
957+
return
958+
}
959+
960+
if isSystemBusy() {
961+
log.Printf("系统繁忙,跳过BadgerDB ValueLog GC(%s)", trigger)
962+
return
963+
}
964+
965+
start := time.Now()
966+
rounds, noRewrite, err := db.DB.RunValueLogGCWithStats(discardRatio, maxRounds)
967+
if err != nil {
968+
log.Printf("BadgerDB ValueLog GC失败(%s, ratio=%.2f, rounds=%d): %v", trigger, discardRatio, maxRounds, err)
969+
return
970+
}
971+
972+
if rounds > 0 || noRewrite {
973+
log.Printf("BadgerDB ValueLog GC完成(%s): ratio=%.2f, rounds=%d, noRewrite=%t, cost=%s", trigger, discardRatio, rounds, noRewrite, time.Since(start))
974+
}
975+
}
976+
954977
// CleanCumulativeTransferData 清理累计流量数据
955978
func CleanCumulativeTransferData(days int) {
956979
// 废弃TrafficManager, 仅保留函数框架
@@ -2363,6 +2386,10 @@ var (
23632386
monitorHistoryBatchBufferLock sync.Mutex
23642387
monitorHistoryBatchSize = 100 // 默认批量大小,从50增加到100
23652388
monitorHistoryBatchInterval = 10 * time.Second // 从3秒增加到10秒,降低处理频率
2389+
2390+
dbMaintenanceSchedulerStarted int32
2391+
dbOptimizeRunning int32
2392+
dbWALCheckpointRunning int32
23662393
)
23672394

23682395
// StartDBWriteWorker 启动数据库写入工作器
@@ -2953,45 +2980,85 @@ func StartDatabaseMaintenanceScheduler() {
29532980
return
29542981
}
29552982

2983+
// 避免重复启动维护调度器导致多个ticker并发执行
2984+
if !atomic.CompareAndSwapInt32(&dbMaintenanceSchedulerStarted, 0, 1) {
2985+
log.Println("数据库维护计划已启动,跳过重复启动")
2986+
return
2987+
}
2988+
29562989
log.Println("启动数据库维护计划...")
29572990

29582991
// 每6小时执行一次数据库优化
29592992
go func() {
2960-
// 启动时延迟30分钟再执行第一次优化,避免与启动时的其他操作冲突
2961-
time.Sleep(30 * time.Minute)
2993+
// 启动时延迟15分钟执行第一次优化,减少长期运行前的碎片积累
2994+
time.Sleep(15 * time.Minute)
2995+
runSQLiteMaintenanceCycle("startup")
29622996

29632997
ticker := time.NewTicker(6 * time.Hour)
29642998
defer ticker.Stop()
29652999

29663000
for range ticker.C {
2967-
if !isSystemBusy() {
2968-
OptimizeDatabase()
2969-
} else {
2970-
log.Println("系统繁忙,跳过定期数据库优化")
2971-
}
3001+
runSQLiteMaintenanceCycle("periodic")
29723002
}
29733003
}()
29743004

29753005
// 安排更频繁的WAL检查点(每小时)
29763006
go func() {
29773007
// 启动时延迟5分钟
29783008
time.Sleep(5 * time.Minute)
3009+
runSQLiteWALCheckpoint("startup")
29793010

29803011
ticker := time.NewTicker(1 * time.Hour)
29813012
defer ticker.Stop()
29823013

29833014
for range ticker.C {
2984-
if !isSystemBusy() {
2985-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
2986-
if err := DB.WithContext(ctx).Exec("PRAGMA wal_checkpoint(PASSIVE)").Error; err != nil {
2987-
log.Printf("执行WAL检查点失败: %v", err)
2988-
}
2989-
cancel()
2990-
}
3015+
runSQLiteWALCheckpoint("periodic")
29913016
}
29923017
}()
29933018
}
29943019

3020+
func runSQLiteMaintenanceCycle(trigger string) {
3021+
if Conf.DatabaseType == "badger" || DB == nil {
3022+
return
3023+
}
3024+
3025+
if !atomic.CompareAndSwapInt32(&dbOptimizeRunning, 0, 1) {
3026+
log.Printf("数据库优化任务仍在执行,跳过本次(%s)", trigger)
3027+
return
3028+
}
3029+
defer atomic.StoreInt32(&dbOptimizeRunning, 0)
3030+
3031+
if isSystemBusy() {
3032+
log.Printf("系统繁忙,跳过定期数据库优化(%s)", trigger)
3033+
return
3034+
}
3035+
3036+
OptimizeDatabase()
3037+
}
3038+
3039+
func runSQLiteWALCheckpoint(trigger string) {
3040+
if Conf.DatabaseType == "badger" || DB == nil {
3041+
return
3042+
}
3043+
3044+
if !atomic.CompareAndSwapInt32(&dbWALCheckpointRunning, 0, 1) {
3045+
log.Printf("WAL检查点任务仍在执行,跳过本次(%s)", trigger)
3046+
return
3047+
}
3048+
defer atomic.StoreInt32(&dbWALCheckpointRunning, 0)
3049+
3050+
if isSystemBusy() {
3051+
log.Printf("系统繁忙,跳过WAL检查点(%s)", trigger)
3052+
return
3053+
}
3054+
3055+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
3056+
defer cancel()
3057+
if err := DB.WithContext(ctx).Exec("PRAGMA wal_checkpoint(PASSIVE)").Error; err != nil {
3058+
log.Printf("执行WAL检查点失败(%s): %v", trigger, err)
3059+
}
3060+
}
3061+
29953062
// SafeUpdateServerStatus 使用无事务模式更新服务器状态,避免锁竞争
29963063
func SafeUpdateServerStatus(serverID uint64, updates map[string]interface{}) error {
29973064
// 检查是否使用 BadgerDB

0 commit comments

Comments
 (0)