From 16d69e15feb42ad722b69d9aecb794a9510d9041 Mon Sep 17 00:00:00 2001 From: secwall Date: Mon, 30 Mar 2026 17:36:52 +0200 Subject: [PATCH] Reconnect to dcs if we are too long in lost state --- internal/app/app.go | 22 ++++++++++++++++++++++ internal/app/lost.go | 10 ++++++++++ internal/config/config.go | 2 ++ internal/valkey/shard.go | 7 +++++++ 4 files changed, 41 insertions(+) diff --git a/internal/app/app.go b/internal/app/app.go index cd79949..0c9f532 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -21,6 +21,7 @@ import ( type App struct { dcsDivergeTime time.Time replFailTime time.Time + lostSince time.Time critical atomic.Value ctx context.Context dcs dcs.DCS @@ -108,6 +109,22 @@ func (app *App) connectDCS() error { return nil } +func (app *App) reconnectDCS() error { + app.logger.Info("Attempting DCS reconnection after prolonged Lost state") + oldDCS := app.dcs + err := app.connectDCS() + if err != nil { + app.logger.Error("DCS reconnection failed", slog.Any("error", err)) + app.dcs = oldDCS + return err + } + app.dcs.SetDisconnectCallback(func() error { return app.handleCritical() }) + app.shard.SetDCS(app.dcs) + oldDCS.Close() + app.logger.Info("DCS reconnection successful") + return nil +} + func (app *App) lockDaemonFile() { app.daemonLock = flock.New(app.config.DaemonLockFile) if locked, err := app.daemonLock.TryLock(); !locked { @@ -184,6 +201,11 @@ func (app *App) Run() int { if nextState == app.state { break } + if nextState == stateLost && app.state != stateLost { + app.lostSince = time.Now() + } else if nextState != stateLost && app.state == stateLost { + app.lostSince = time.Time{} + } app.state = nextState } case <-app.ctx.Done(): diff --git a/internal/app/lost.go b/internal/app/lost.go index 507c43f..2b3b957 100644 --- a/internal/app/lost.go +++ b/internal/app/lost.go @@ -3,12 +3,22 @@ package app import ( "fmt" "log/slog" + "time" ) func (app *App) stateLost() appState { if app.dcs.IsConnected() { return stateCandidate } + if !app.lostSince.IsZero() && time.Since(app.lostSince) >= app.config.DcsReconnectTimeout { + app.logger.Warn(fmt.Sprintf("Lost state persisted for %s, attempting DCS reconnection", time.Since(app.lostSince).Truncate(time.Second))) + err := app.reconnectDCS() + app.lostSince = time.Now() + if err != nil { + app.logger.Error("DCS reconnection attempt failed, will retry later", slog.Any("error", err)) + } + return stateLost + } if len(app.shard.Hosts()) == 1 { return stateLost } diff --git a/internal/config/config.go b/internal/config/config.go index 7da5eb5..280796a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -79,6 +79,7 @@ type Config struct { InfoFileHandlerInterval time.Duration `yaml:"info_file_handler_interval"` InactivationDelay time.Duration `yaml:"inactivation_delay"` DcsWaitTimeout time.Duration `yaml:"dcs_wait_timeout"` + DcsReconnectTimeout time.Duration `yaml:"dcs_reconnect_timeout"` TickInterval time.Duration `yaml:"tick_interval"` PingStable int `yaml:"ping_stable"` } @@ -178,6 +179,7 @@ func DefaultConfig() (Config, error) { PprofAddr: "", Zookeeper: zkConfig, DcsWaitTimeout: 10 * time.Second, + DcsReconnectTimeout: 2 * time.Minute, Valkey: DefaultValkeyConfig(), SentinelMode: sentinelConf, } diff --git a/internal/valkey/shard.go b/internal/valkey/shard.go index 2d1a21a..274b746 100644 --- a/internal/valkey/shard.go +++ b/internal/valkey/shard.go @@ -38,6 +38,13 @@ func NewShard(config *config.Config, logger *slog.Logger, dcs dcs.DCS) *Shard { return s } +// SetDCS updates the DCS reference used by the shard +func (s *Shard) SetDCS(newDCS dcs.DCS) { + s.Lock() + defer s.Unlock() + s.dcs = newDCS +} + // GetShardHostsFromDcs returns current shard hosts from dcs state func (s *Shard) GetShardHostsFromDcs() ([]string, error) { fqdns, err := s.dcs.GetChildren(dcs.PathHANodesPrefix)